BUG-5280: add basic concept of ClientSnapshot
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / AbstractClientHistory.java
index 519763ac021989df012cb6ac05eba80140acdfb3..1be84643350acdd197f5ab5dfe10dfcf558b146e 100644 (file)
@@ -49,7 +49,7 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia
             AtomicReferenceFieldUpdater.newUpdater(AbstractClientHistory.class, State.class, "state");
 
     @GuardedBy("this")
-    private final Map<TransactionIdentifier, ClientTransaction> openTransactions = new HashMap<>();
+    private final Map<TransactionIdentifier, AbstractClientHandle<?>> openTransactions = new HashMap<>();
     @GuardedBy("this")
     private final Map<TransactionIdentifier, AbstractTransactionCommitCohort> readyTransactions = new HashMap<>();
 
@@ -99,7 +99,7 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia
             LOG.debug("Force-closing history {}", getIdentifier(), cause);
 
             synchronized (this) {
-                for (ClientTransaction t : openTransactions.values()) {
+                for (AbstractClientHandle<?> t : openTransactions.values()) {
                     t.localAbort(cause);
                 }
                 openTransactions.clear();
@@ -136,32 +136,41 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia
         LOG.debug("Create history response {}", response);
     }
 
-    final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier transactionId, final Long shard) {
+    private ProxyHistory ensureHistoryProxy(final TransactionIdentifier transactionId, final Long shard) {
         while (true) {
-            final ProxyHistory history;
             try {
-                history = histories.computeIfAbsent(shard, this::createHistoryProxy);
+                return histories.computeIfAbsent(shard, this::createHistoryProxy);
             } catch (InversibleLockException e) {
                 LOG.trace("Waiting for transaction {} shard {} connection to resolve", transactionId, shard);
                 e.awaitResolution();
                 LOG.trace("Retrying transaction {} shard {} connection", transactionId, shard);
-                continue;
             }
+        }
+    }
+
+    final AbstractProxyTransaction createSnapshotProxy(final TransactionIdentifier transactionId, final Long shard) {
+        return ensureHistoryProxy(transactionId, shard).createTransactionProxy(transactionId, true);
+    }
+
+    final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier transactionId, final Long shard) {
+        return ensureHistoryProxy(transactionId, shard).createTransactionProxy(transactionId, false);
+    }
 
-            return history.createTransactionProxy(transactionId);
+    private void checkNotClosed() {
+        if (state == State.CLOSED) {
+            throw new TransactionChainClosedException(String.format("Local history %s is closed", identifier));
         }
     }
 
     /**
-     * Allocate a {@link ClientTransaction}.
+     * Allocate a new {@link ClientTransaction}.
      *
      * @return A new {@link ClientTransaction}
      * @throws TransactionChainClosedException if this history is closed
+     * @throws IllegalStateException if a previous dependent transaction has not been closed
      */
     public final ClientTransaction createTransaction() {
-        if (state == State.CLOSED) {
-            throw new TransactionChainClosedException(String.format("Local history %s is closed", identifier));
-        }
+        checkNotClosed();
 
         synchronized (this) {
             final ClientTransaction ret = doCreateTransaction();
@@ -170,6 +179,26 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia
         }
     }
 
+    /**
+     * Create a new {@link ClientSnapshot}.
+     *
+     * @return A new {@link ClientSnapshot}
+     * @throws TransactionChainClosedException if this history is closed
+     * @throws IllegalStateException if a previous dependent transaction has not been closed
+     */
+    public final ClientSnapshot takeSnapshot() {
+        checkNotClosed();
+
+        synchronized (this) {
+            final ClientSnapshot ret = doCreateSnapshot();
+            openTransactions.put(ret.getIdentifier(), ret);
+            return ret;
+        }
+    }
+
+    @GuardedBy("this")
+    abstract ClientSnapshot doCreateSnapshot();
+
     @GuardedBy("this")
     abstract ClientTransaction doCreateTransaction();
 
@@ -179,10 +208,12 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia
      * @param txId Transaction identifier
      * @param cohort Transaction commit cohort
      */
-    synchronized AbstractTransactionCommitCohort onTransactionReady(final TransactionIdentifier txId,
+    synchronized AbstractTransactionCommitCohort onTransactionReady(final ClientTransaction tx,
             final AbstractTransactionCommitCohort cohort) {
-        final ClientTransaction tx = openTransactions.remove(txId);
-        Preconditions.checkState(tx != null, "Failed to find open transaction for %s", txId);
+        final TransactionIdentifier txId = tx.getIdentifier();
+        if (openTransactions.remove(txId) == null) {
+            LOG.warn("Transaction {} not recorded, proceeding with readiness", txId);
+        }
 
         final AbstractTransactionCommitCohort previous = readyTransactions.putIfAbsent(txId, cohort);
         Preconditions.checkState(previous == null, "Duplicate cohort %s for transaction %s, already have %s",
@@ -196,11 +227,11 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia
      * Callback invoked from {@link ClientTransaction} when a child transaction has been aborted without touching
      * backend.
      *
-     * @param txId transaction identifier
+     * @param snapshot transaction identifier
      */
-    synchronized void onTransactionAbort(final TransactionIdentifier txId) {
-        if (openTransactions.remove(txId) == null) {
-            LOG.warn("Could not find aborting transaction {}", txId);
+    synchronized void onTransactionAbort(final AbstractClientHandle<?> snapshot) {
+        if (openTransactions.remove(snapshot.getIdentifier()) == null) {
+            LOG.warn("Could not find aborting transaction {}", snapshot.getIdentifier());
         }
     }