CDS: introduce AbstractThreePhaseCommitCohort
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionProxy.java
index 504612a05f707381ba0f26a0b899e05dae68d289..0b863623b4b9877cfc1553456994d241008f35ab 100644 (file)
@@ -21,7 +21,6 @@ import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -42,7 +41,6 @@ import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -84,6 +82,12 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         }
     }
 
+    private static enum TransactionState {
+        OPEN,
+        READY,
+        CLOSED,
+    }
+
     static final Mapper<Throwable, Throwable> SAME_FAILURE_TRANSFORMER =
                                                               new Mapper<Throwable, Throwable>() {
         @Override
@@ -187,7 +191,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
     private final ActorContext actorContext;
     private final String transactionChainId;
     private final SchemaContext schemaContext;
-    private boolean inReadyState;
+    private TransactionState state = TransactionState.OPEN;
 
     private volatile boolean initialized;
     private Semaphore operationLimiter;
@@ -293,7 +297,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
     private void checkModificationState() {
         Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
                 "Modification operation on read-only transaction is not allowed");
-        Preconditions.checkState(!inReadyState,
+        Preconditions.checkState(state == TransactionState.OPEN,
                 "Transaction is sealed - further modifications are not allowed");
     }
 
@@ -326,7 +330,6 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         }
     }
 
-
     @Override
     public void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
 
@@ -381,26 +384,34 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         });
     }
 
-    @Override
-    public DOMStoreThreePhaseCommitCohort ready() {
+    private boolean seal(final TransactionState newState) {
+        if (state == TransactionState.OPEN) {
+            state = newState;
+            return true;
+        } else {
+            return false;
+        }
+    }
 
-        checkModificationState();
+    @Override
+    public AbstractThreePhaseCommitCohort ready() {
+        Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
+                "Read-only transactions cannot be readied");
 
-        inReadyState = true;
+        final boolean success = seal(TransactionState.READY);
+        Preconditions.checkState(success, "Transaction %s is %s, it cannot be readied", getIdentifier(), state);
 
         LOG.debug("Tx {} Readying {} transactions for commit", getIdentifier(),
                     txFutureCallbackMap.size());
 
         if (txFutureCallbackMap.isEmpty()) {
-            onTransactionReady(Collections.<Future<ActorSelection>>emptyList());
             TransactionRateLimitingCallback.adjustRateLimitForUnusedTransaction(actorContext);
             return NoOpDOMStoreThreePhaseCommitCohort.INSTANCE;
         }
 
         throttleOperation(txFutureCallbackMap.size());
 
-        List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
-
+        List<Future<ActorSelection>> cohortFutures = new ArrayList<>(txFutureCallbackMap.size());
         for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
 
             LOG.debug("Tx {} Readying transaction for shard {} chain {}", getIdentifier(),
@@ -425,22 +436,22 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
             cohortFutures.add(future);
         }
 
-        onTransactionReady(cohortFutures);
-
         return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures,
             getIdentifier().toString());
     }
 
-    /**
-     * Method for derived classes to be notified when the transaction has been readied.
-     *
-     * @param cohortFutures the cohort Futures for each shard transaction.
-     */
-    protected void onTransactionReady(List<Future<ActorSelection>> cohortFutures) {
-    }
-
     @Override
     public void close() {
+        if (!seal(TransactionState.CLOSED)) {
+            if (state == TransactionState.CLOSED) {
+                // Idempotent no-op as per AutoCloseable recommendation
+                return;
+            }
+
+            throw new IllegalStateException(String.format("Transaction %s is ready, it cannot be closed",
+                getIdentifier()));
+        }
+
         for (TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
             txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
                 @Override