CDS: Implement front-end support for local transactions
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionProxy.java
index a2a7a12..82258b4 100644 (file)
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorSelection;
-import akka.dispatch.Mapper;
-import akka.dispatch.OnComplete;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.FinalizablePhantomReference;
-import com.google.common.base.FinalizableReferenceQueue;
+import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
+import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.Map.Entry;
+import java.util.Set;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import javax.annotation.concurrent.GuardedBy;
-import org.opendaylight.controller.cluster.datastore.compat.PreLithiumTransactionContextImpl;
-import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregator;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
 import scala.concurrent.Promise;
-import scala.concurrent.duration.FiniteDuration;
 
 /**
- * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard
- * <p>
- * Creating a transaction on the consumer side will create one instance of a transaction proxy. If during
- * the transaction reads and writes are done on data that belongs to different shards then a separate transaction will
- * be created on each of those shards by the TransactionProxy
- *</p>
- * <p>
- * The TransactionProxy does not make any guarantees about atomicity or order in which the transactions on the various
- * shards will be executed.
- * </p>
+ * A transaction potentially spanning multiple backend shards.
  */
 public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIdentifier> implements DOMStoreReadWriteTransaction {
-
-    public static enum TransactionType {
-        READ_ONLY,
-        WRITE_ONLY,
-        READ_WRITE;
-
-        // Cache all values
-        private static final TransactionType[] VALUES = values();
-
-        public static TransactionType fromInt(final int type) {
-            try {
-                return VALUES[type];
-            } catch (IndexOutOfBoundsException e) {
-                throw new IllegalArgumentException("In TransactionType enum value " + type, e);
-            }
-        }
-    }
-
     private static enum TransactionState {
         OPEN,
         READY,
         CLOSED,
     }
-
-    static final Mapper<Throwable, Throwable> SAME_FAILURE_TRANSFORMER =
-                                                              new Mapper<Throwable, Throwable>() {
-        @Override
-        public Throwable apply(Throwable failure) {
-            return failure;
-        }
-    };
-
-    private static final AtomicLong counter = new AtomicLong();
-
     private static final Logger LOG = LoggerFactory.getLogger(TransactionProxy.class);
 
-    /**
-     * Time interval in between transaction create retries.
-     */
-    private static final FiniteDuration CREATE_TX_TRY_INTERVAL =
-            FiniteDuration.create(1, TimeUnit.SECONDS);
-
-    /**
-     * Used to enqueue the PhantomReferences for read-only TransactionProxy instances. The
-     * FinalizableReferenceQueue is safe to use statically in an OSGi environment as it uses some
-     * trickery to clean up its internal thread when the bundle is unloaded.
-     */
-    private static final FinalizableReferenceQueue phantomReferenceQueue =
-                                                                  new FinalizableReferenceQueue();
-
-    /**
-     * This stores the TransactionProxyCleanupPhantomReference instances statically, This is
-     * necessary because PhantomReferences need a hard reference so they're not garbage collected.
-     * Once finalized, the TransactionProxyCleanupPhantomReference removes itself from this map
-     * and thus becomes eligible for garbage collection.
-     */
-    private static final Map<TransactionProxyCleanupPhantomReference,
-                             TransactionProxyCleanupPhantomReference> phantomReferenceCache =
-                                                                        new ConcurrentHashMap<>();
-
-    /**
-     * A PhantomReference that closes remote transactions for a TransactionProxy when it's
-     * garbage collected. This is used for read-only transactions as they're not explicitly closed
-     * by clients. So the only way to detect that a transaction is no longer in use and it's safe
-     * to clean up is when it's garbage collected. It's inexact as to when an instance will be GC'ed
-     * but TransactionProxy instances should generally be short-lived enough to avoid being moved
-     * to the old generation space and thus should be cleaned up in a timely manner as the GC
-     * runs on the young generation (eden, swap1...) space much more frequently.
-     */
-    private static class TransactionProxyCleanupPhantomReference
-                                           extends FinalizablePhantomReference<TransactionProxy> {
-
-        private final List<ActorSelection> remoteTransactionActors;
-        private final AtomicBoolean remoteTransactionActorsMB;
-        private final ActorContext actorContext;
-        private final TransactionIdentifier identifier;
-
-        protected TransactionProxyCleanupPhantomReference(TransactionProxy referent) {
-            super(referent, phantomReferenceQueue);
-
-            // Note we need to cache the relevant fields from the TransactionProxy as we can't
-            // have a hard reference to the TransactionProxy instance itself.
-
-            remoteTransactionActors = referent.remoteTransactionActors;
-            remoteTransactionActorsMB = referent.remoteTransactionActorsMB;
-            actorContext = referent.actorContext;
-            identifier = referent.getIdentifier();
-        }
-
-        @Override
-        public void finalizeReferent() {
-            LOG.trace("Cleaning up {} Tx actors for TransactionProxy {}",
-                    remoteTransactionActors.size(), identifier);
-
-            phantomReferenceCache.remove(this);
-
-            // Access the memory barrier volatile to ensure all previous updates to the
-            // remoteTransactionActors list are visible to this thread.
-
-            if(remoteTransactionActorsMB.get()) {
-                for(ActorSelection actor : remoteTransactionActors) {
-                    LOG.trace("Sending CloseTransaction to {}", actor);
-                    actorContext.sendOperationAsync(actor, CloseTransaction.INSTANCE.toSerializable());
-                }
-            }
-        }
-    }
-
-    /**
-     * Stores the remote Tx actors for each requested data store path to be used by the
-     * PhantomReference to close the remote Tx's. This is only used for read-only Tx's. The
-     * remoteTransactionActorsMB volatile serves as a memory barrier to publish updates to the
-     * remoteTransactionActors list so they will be visible to the thread accessing the
-     * PhantomReference.
-     */
-    private List<ActorSelection> remoteTransactionActors;
-    private volatile AtomicBoolean remoteTransactionActorsMB;
-
-    /**
-     * Stores the create transaction results per shard.
-     */
-    private final Map<String, TransactionFutureCallback> txFutureCallbackMap = new HashMap<>();
-
-    private final TransactionType transactionType;
-    private final ActorContext actorContext;
-    private final String transactionChainId;
-    private final SchemaContext schemaContext;
+    private final Map<String, TransactionContextWrapper> txContextAdapters = new HashMap<>();
+    private final AbstractTransactionContextFactory<?> txContextFactory;
+    private final TransactionType type;
     private TransactionState state = TransactionState.OPEN;
-
-    private volatile boolean initialized;
-    private Semaphore operationLimiter;
-    private OperationCompleter operationCompleter;
-
-    public TransactionProxy(ActorContext actorContext, TransactionType transactionType) {
-        this(actorContext, transactionType, "");
-    }
-
-    public TransactionProxy(ActorContext actorContext, TransactionType transactionType, String transactionChainId) {
-        super(createIdentifier(actorContext));
-        this.actorContext = Preconditions.checkNotNull(actorContext,
-            "actorContext should not be null");
-        this.transactionType = Preconditions.checkNotNull(transactionType,
-            "transactionType should not be null");
-        this.schemaContext = Preconditions.checkNotNull(actorContext.getSchemaContext(),
-            "schemaContext should not be null");
-        this.transactionChainId = transactionChainId;
-
-        LOG.debug("Created txn {} of type {} on chain {}", getIdentifier(), transactionType, transactionChainId);
-    }
-
-    private static TransactionIdentifier createIdentifier(ActorContext actorContext) {
-        String memberName = actorContext.getCurrentMemberName();
-        if (memberName == null) {
-            memberName = "UNKNOWN-MEMBER";
-        }
-
-        return new TransactionIdentifier(memberName, counter.getAndIncrement());
-    }
-
-    @VisibleForTesting
-    List<Future<Object>> getRecordedOperationFutures() {
-        List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
-        for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
-            TransactionContext transactionContext = txFutureCallback.getTransactionContext();
-            if (transactionContext != null) {
-                transactionContext.copyRecordedOperationFutures(recordedOperationFutures);
-            }
-        }
-
-        return recordedOperationFutures;
-    }
+    private volatile OperationCompleter operationCompleter;
+    private volatile Semaphore operationLimiter;
 
     @VisibleForTesting
-    boolean hasTransactionContext() {
-        for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
-            TransactionContext transactionContext = txFutureCallback.getTransactionContext();
-            if(transactionContext != null) {
-                return true;
-            }
-        }
+    public TransactionProxy(final AbstractTransactionContextFactory<?> txContextFactory, final TransactionType type) {
+        super(txContextFactory.nextIdentifier(), false);
+        this.txContextFactory = txContextFactory;
+        this.type = Preconditions.checkNotNull(type);
 
-        return false;
+        LOG.debug("New {} Tx - {}", type, getIdentifier());
     }
 
     @Override
-    public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final YangInstanceIdentifier path) {
-
-        Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
-                "Read operation on write-only transaction is not allowed");
+    public CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
+        Preconditions.checkState(type != TransactionType.WRITE_ONLY, "Reads from write-only transactions are not allowed");
 
-        LOG.debug("Tx {} read {}", getIdentifier(), path);
+        LOG.debug("Tx {} exists {}", getIdentifier(), path);
 
         throttleOperation();
 
-        final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture = SettableFuture.create();
-
-        TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
-        txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
+        final SettableFuture<Boolean> proxyFuture = SettableFuture.create();
+        TransactionContextWrapper contextAdapter = getContextAdapter(path);
+        contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
             @Override
             public void invoke(TransactionContext transactionContext) {
-                transactionContext.readData(path, proxyFuture);
+                transactionContext.dataExists(path, proxyFuture);
             }
         });
 
@@ -274,93 +90,86 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
     }
 
     @Override
-    public CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
-
-        Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
-                "Exists operation on write-only transaction is not allowed");
+    public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final YangInstanceIdentifier path) {
+        Preconditions.checkState(type != TransactionType.WRITE_ONLY, "Reads from write-only transactions are not allowed");
 
-        LOG.debug("Tx {} exists {}", getIdentifier(), path);
+        LOG.debug("Tx {} read {}", getIdentifier(), path);
 
-        throttleOperation();
+        if (YangInstanceIdentifier.EMPTY.equals(path)) {
+            return readAllData();
+        } else {
+            throttleOperation();
 
-        final SettableFuture<Boolean> proxyFuture = SettableFuture.create();
+            return singleShardRead(shardNameFromIdentifier(path), path);
+        }
+    }
 
-        TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
-        txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
+    private CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> singleShardRead(
+            final String shardName, final YangInstanceIdentifier path) {
+        final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture = SettableFuture.create();
+        TransactionContextWrapper contextAdapter = getContextAdapter(shardName);
+        contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
             @Override
             public void invoke(TransactionContext transactionContext) {
-                transactionContext.dataExists(path, proxyFuture);
+                transactionContext.readData(path, proxyFuture);
             }
         });
 
         return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
     }
 
-    private void checkModificationState() {
-        Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
-                "Modification operation on read-only transaction is not allowed");
-        Preconditions.checkState(state == TransactionState.OPEN,
-                "Transaction is sealed - further modifications are not allowed");
-    }
+    private CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readAllData() {
+        final Set<String> allShardNames = txContextFactory.getActorContext().getConfiguration().getAllShardNames();
+        final Collection<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>> futures = new ArrayList<>(allShardNames.size());
 
-    private void throttleOperation() {
-        throttleOperation(1);
-    }
-
-    private void throttleOperation(int acquirePermits) {
-        if(!initialized) {
-            // Note : Currently mailbox-capacity comes from akka.conf and not from the config-subsystem
-            operationLimiter = new Semaphore(actorContext.getTransactionOutstandingOperationLimit());
-            operationCompleter = new OperationCompleter(operationLimiter);
-
-            // Make sure we write this last because it's volatile and will also publish the non-volatile writes
-            // above as well so they'll be visible to other threads.
-            initialized = true;
+        for (String shardName : allShardNames) {
+            futures.add(singleShardRead(shardName, YangInstanceIdentifier.EMPTY));
         }
 
-        try {
-            if(!operationLimiter.tryAcquire(acquirePermits,
-                    actorContext.getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){
-                LOG.warn("Failed to acquire operation permit for transaction {}", getIdentifier());
-            }
-        } catch (InterruptedException e) {
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("Interrupted when trying to acquire operation permit for transaction " + getIdentifier().toString(), e);
-            } else {
-                LOG.warn("Interrupted when trying to acquire operation permit for transaction {}", getIdentifier());
+        final ListenableFuture<List<Optional<NormalizedNode<?, ?>>>> listFuture = Futures.allAsList(futures);
+        final ListenableFuture<Optional<NormalizedNode<?, ?>>> aggregateFuture;
+
+        aggregateFuture = Futures.transform(listFuture, new Function<List<Optional<NormalizedNode<?, ?>>>, Optional<NormalizedNode<?, ?>>>() {
+            @Override
+            public Optional<NormalizedNode<?, ?>> apply(final List<Optional<NormalizedNode<?, ?>>> input) {
+                try {
+                    return NormalizedNodeAggregator.aggregate(YangInstanceIdentifier.EMPTY, input, txContextFactory.getActorContext().getSchemaContext());
+                } catch (DataValidationFailedException e) {
+                    throw new IllegalArgumentException("Failed to aggregate", e);
+                }
             }
-        }
+        });
+
+        return MappingCheckedFuture.create(aggregateFuture, ReadFailedException.MAPPER);
     }
 
     @Override
-    public void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
-
+    public void delete(final YangInstanceIdentifier path) {
         checkModificationState();
 
-        LOG.debug("Tx {} write {}", getIdentifier(), path);
+        LOG.debug("Tx {} delete {}", getIdentifier(), path);
 
         throttleOperation();
 
-        TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
-        txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
+        TransactionContextWrapper contextAdapter = getContextAdapter(path);
+        contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
             @Override
             public void invoke(TransactionContext transactionContext) {
-                transactionContext.writeData(path, data);
+                transactionContext.deleteData(path);
             }
         });
     }
 
     @Override
     public void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
-
         checkModificationState();
 
         LOG.debug("Tx {} merge {}", getIdentifier(), path);
 
         throttleOperation();
 
-        TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
-        txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
+        TransactionContextWrapper contextAdapter = getContextAdapter(path);
+        contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
             @Override
             public void invoke(TransactionContext transactionContext) {
                 transactionContext.mergeData(path, data);
@@ -369,23 +178,29 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
     }
 
     @Override
-    public void delete(final YangInstanceIdentifier path) {
-
+    public void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
         checkModificationState();
 
-        LOG.debug("Tx {} delete {}", getIdentifier(), path);
+        LOG.debug("Tx {} write {}", getIdentifier(), path);
 
         throttleOperation();
 
-        TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
-        txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
+        TransactionContextWrapper contextAdapter = getContextAdapter(path);
+        contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
             @Override
             public void invoke(TransactionContext transactionContext) {
-                transactionContext.deleteData(path);
+                transactionContext.writeData(path, data);
             }
         });
     }
 
+    private void checkModificationState() {
+        Preconditions.checkState(type != TransactionType.READ_ONLY,
+                "Modification operation on read-only transaction is not allowed");
+        Preconditions.checkState(state == TransactionState.OPEN,
+                "Transaction is sealed - further modifications are not allowed");
+    }
+
     private boolean seal(final TransactionState newState) {
         if (state == TransactionState.OPEN) {
             state = newState;
@@ -396,78 +211,16 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
     }
 
     @Override
-    public DOMStoreThreePhaseCommitCohort ready() {
-        Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
-                "Read-only transactions cannot be readied");
-
-        final boolean success = seal(TransactionState.READY);
-        Preconditions.checkState(success, "Transaction %s is %s, it cannot be readied", getIdentifier(), state);
-
-        LOG.debug("Tx {} Readying {} transactions for commit", getIdentifier(),
-                    txFutureCallbackMap.size());
-
-        if (txFutureCallbackMap.isEmpty()) {
-            onTransactionReady(Collections.<Future<ActorSelection>>emptyList());
-            TransactionRateLimitingCallback.adjustRateLimitForUnusedTransaction(actorContext);
-            return NoOpDOMStoreThreePhaseCommitCohort.INSTANCE;
-        }
-
-        throttleOperation(txFutureCallbackMap.size());
-
-        List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
-
-        for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
-
-            LOG.debug("Tx {} Readying transaction for shard {} chain {}", getIdentifier(),
-                        txFutureCallback.getShardName(), transactionChainId);
-
-            final TransactionContext transactionContext = txFutureCallback.getTransactionContext();
-            final Future<ActorSelection> future;
-            if (transactionContext != null) {
-                // avoid the creation of a promise and a TransactionOperation
-                future = transactionContext.readyTransaction();
-            } else {
-                final Promise<ActorSelection> promise = akka.dispatch.Futures.promise();
-                txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
-                    @Override
-                    public void invoke(TransactionContext transactionContext) {
-                        promise.completeWith(transactionContext.readyTransaction());
-                    }
-                });
-                future = promise.future();
-            }
-
-            cohortFutures.add(future);
-        }
-
-        onTransactionReady(cohortFutures);
-
-        return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures,
-            getIdentifier().toString());
-    }
-
-    /**
-     * Method for derived classes to be notified when the transaction has been readied.
-     *
-     * @param cohortFutures the cohort Futures for each shard transaction.
-     */
-    protected void onTransactionReady(List<Future<ActorSelection>> cohortFutures) {
-    }
-
-    @Override
-    public void close() {
+    public final void close() {
         if (!seal(TransactionState.CLOSED)) {
-            if (state == TransactionState.CLOSED) {
-                // Idempotent no-op as per AutoCloseable recommendation
-                return;
-            }
-
-            throw new IllegalStateException(String.format("Transaction %s is ready, it cannot be closed",
-                getIdentifier()));
+            Preconditions.checkState(state == TransactionState.CLOSED, "Transaction %s is ready, it cannot be closed",
+                getIdentifier());
+            // Idempotent no-op as per AutoCloseable recommendation
+            return;
         }
 
-        for (TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
-            txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
+        for (TransactionContextWrapper contextAdapter : txContextAdapters.values()) {
+            contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
                 @Override
                 public void invoke(TransactionContext transactionContext) {
                     transactionContext.closeTransaction();
@@ -475,309 +228,178 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
             });
         }
 
-        txFutureCallbackMap.clear();
 
-        if(remoteTransactionActorsMB != null) {
-            remoteTransactionActors.clear();
-            remoteTransactionActorsMB.set(true);
-        }
+        txContextAdapters.clear();
     }
 
-    private String shardNameFromIdentifier(YangInstanceIdentifier path){
-        return ShardStrategyFactory.getStrategy(path).findShard(path);
-    }
-
-    protected Future<ActorSelection> sendFindPrimaryShardAsync(String shardName) {
-        return actorContext.findPrimaryShardAsync(shardName);
-    }
-
-    private TransactionFutureCallback getOrCreateTxFutureCallback(YangInstanceIdentifier path) {
-        String shardName = shardNameFromIdentifier(path);
-        TransactionFutureCallback txFutureCallback = txFutureCallbackMap.get(shardName);
-        if(txFutureCallback == null) {
-            Future<ActorSelection> findPrimaryFuture = sendFindPrimaryShardAsync(shardName);
-
-            final TransactionFutureCallback newTxFutureCallback = new TransactionFutureCallback(shardName);
+    @Override
+    public final AbstractThreePhaseCommitCohort<?> ready() {
+        Preconditions.checkState(type != TransactionType.READ_ONLY, "Read-only transactions cannot be readied");
 
-            txFutureCallback = newTxFutureCallback;
-            txFutureCallbackMap.put(shardName, txFutureCallback);
+        final boolean success = seal(TransactionState.READY);
+        Preconditions.checkState(success, "Transaction %s is %s, it cannot be readied", getIdentifier(), state);
 
-            findPrimaryFuture.onComplete(new OnComplete<ActorSelection>() {
-                @Override
-                public void onComplete(Throwable failure, ActorSelection primaryShard) {
-                    if(failure != null) {
-                        newTxFutureCallback.createTransactionContext(failure, null);
-                    } else {
-                        newTxFutureCallback.setPrimaryShard(primaryShard);
-                    }
-                }
-            }, actorContext.getClientDispatcher());
+        LOG.debug("Tx {} Readying {} components for commit", getIdentifier(), txContextAdapters.size());
+
+        final AbstractThreePhaseCommitCohort<?> ret;
+        switch (txContextAdapters.size()) {
+        case 0:
+            TransactionRateLimitingCallback.adjustRateLimitForUnusedTransaction(txContextFactory.getActorContext());
+            ret = NoOpDOMStoreThreePhaseCommitCohort.INSTANCE;
+            break;
+        case 1:
+            final Entry<String, TransactionContextWrapper> e = Iterables.getOnlyElement(txContextAdapters.entrySet());
+            ret = createSingleCommitCohort(e.getKey(), e.getValue());
+            break;
+        default:
+            ret = createMultiCommitCohort(txContextAdapters.entrySet());
         }
 
-        return txFutureCallback;
-    }
-
-    public String getTransactionChainId() {
-        return transactionChainId;
+        txContextFactory.onTransactionReady(getIdentifier(), ret.getCohortFutures());
+        return ret;
     }
 
-    protected ActorContext getActorContext() {
-        return actorContext;
-    }
+    private AbstractThreePhaseCommitCohort<?> createSingleCommitCohort(final String shardName,
+            final TransactionContextWrapper contextAdapter) {
+        throttleOperation();
 
-    /**
-     * Implements a Future OnComplete callback for a CreateTransaction message. This class handles
-     * retries, up to a limit, if the shard doesn't have a leader yet. This is done by scheduling a
-     * retry task after a short delay.
-     * <p>
-     * The end result from a completed CreateTransaction message is a TransactionContext that is
-     * used to perform transaction operations. Transaction operations that occur before the
-     * CreateTransaction completes are cache and executed once the CreateTransaction completes,
-     * successfully or not.
-     */
-    private class TransactionFutureCallback extends OnComplete<Object> {
-
-        /**
-         * The list of transaction operations to execute once the CreateTransaction completes.
-         */
-        @GuardedBy("txOperationsOnComplete")
-        private final List<TransactionOperation> txOperationsOnComplete = Lists.newArrayList();
-
-        /**
-         * The TransactionContext resulting from the CreateTransaction reply.
-         */
-        private volatile TransactionContext transactionContext;
-
-        /**
-         * The target primary shard.
-         */
-        private volatile ActorSelection primaryShard;
-
-        private volatile int createTxTries = (int) (actorContext.getDatastoreContext().
-                getShardLeaderElectionTimeout().duration().toMillis() /
-                CREATE_TX_TRY_INTERVAL.toMillis());
-
-        private final String shardName;
-
-        TransactionFutureCallback(String shardName) {
-            this.shardName = shardName;
-        }
+        LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), shardName);
 
-        String getShardName() {
-            return shardName;
-        }
+        final OperationCallback.Reference operationCallbackRef =
+                new OperationCallback.Reference(OperationCallback.NO_OP_CALLBACK);
 
-        TransactionContext getTransactionContext() {
-            return transactionContext;
+        final TransactionContext transactionContext = contextAdapter.getTransactionContext();
+        final Future future;
+        if (transactionContext == null) {
+            final Promise promise = akka.dispatch.Futures.promise();
+            contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
+                @Override
+                public void invoke(TransactionContext transactionContext) {
+                    promise.completeWith(getReadyOrDirectCommitFuture(transactionContext, operationCallbackRef));
+                }
+            });
+            future = promise.future();
+        } else {
+            // avoid the creation of a promise and a TransactionOperation
+            future = getReadyOrDirectCommitFuture(transactionContext, operationCallbackRef);
         }
 
+        return new SingleCommitCohortProxy(txContextFactory.getActorContext(), future, getIdentifier().toString(),
+                operationCallbackRef);
+    }
 
-        /**
-         * Sets the target primary shard and initiates a CreateTransaction try.
-         */
-        void setPrimaryShard(ActorSelection primaryShard) {
-            this.primaryShard = primaryShard;
-
-            if(transactionType == TransactionType.WRITE_ONLY &&
-                    actorContext.getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
-                LOG.debug("Tx {} Primary shard {} found - creating WRITE_ONLY transaction context",
-                    getIdentifier(), primaryShard);
-
-                // For write-only Tx's we prepare the transaction modifications directly on the shard actor
-                // to avoid the overhead of creating a separate transaction actor.
-                // FIXME: can't assume the shard version is LITHIUM_VERSION - need to obtain it somehow.
-                executeTxOperatonsOnComplete(createValidTransactionContext(this.primaryShard,
-                        this.primaryShard.path().toString(), DataStoreVersions.LITHIUM_VERSION));
-            } else {
-                tryCreateTransaction();
-            }
+    private Future<?> getReadyOrDirectCommitFuture(TransactionContext transactionContext,
+            OperationCallback.Reference operationCallbackRef) {
+        if (transactionContext.supportsDirectCommit()) {
+            TransactionRateLimitingCallback rateLimitingCallback = new TransactionRateLimitingCallback(
+                    txContextFactory.getActorContext());
+            operationCallbackRef.set(rateLimitingCallback);
+            rateLimitingCallback.run();
+            return transactionContext.directCommit();
+        } else {
+            return transactionContext.readyTransaction();
         }
+    }
 
-        /**
-         * Adds a TransactionOperation to be executed after the CreateTransaction completes.
-         */
-        void addTxOperationOnComplete(TransactionOperation operation) {
-            boolean invokeOperation = true;
-            synchronized(txOperationsOnComplete) {
-                if(transactionContext == null) {
-                    LOG.debug("Tx {} Adding operation on complete", getIdentifier());
-
-                    invokeOperation = false;
-                    txOperationsOnComplete.add(operation);
-                }
-            }
-
-            if(invokeOperation) {
-                operation.invoke(transactionContext);
-            }
-        }
+    private AbstractThreePhaseCommitCohort<ActorSelection> createMultiCommitCohort(
+            final Set<Entry<String, TransactionContextWrapper>> txContextAdapterEntries) {
 
-        void enqueueTransactionOperation(final TransactionOperation op) {
+        throttleOperation();
+        final List<Future<ActorSelection>> cohortFutures = new ArrayList<>(txContextAdapterEntries.size());
+        for (Entry<String, TransactionContextWrapper> e : txContextAdapterEntries) {
+            LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), e.getKey());
 
+            TransactionContextWrapper contextAdapter = e.getValue();
+            final TransactionContext transactionContext = contextAdapter.getTransactionContext();
+            Future<ActorSelection> future;
             if (transactionContext != null) {
-                op.invoke(transactionContext);
+                // avoid the creation of a promise and a TransactionOperation
+                future = transactionContext.readyTransaction();
             } else {
-                // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
-                // callback to be executed after the Tx is created.
-                addTxOperationOnComplete(op);
-            }
-        }
+                final Promise<ActorSelection> promise = akka.dispatch.Futures.promise();
+                contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
+                    @Override
+                    public void invoke(TransactionContext transactionContext) {
+                        promise.completeWith(transactionContext.readyTransaction());
+                    }
+                });
 
-        /**
-         * Performs a CreateTransaction try async.
-         */
-        private void tryCreateTransaction() {
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("Tx {} Primary shard {} found - trying create transaction", getIdentifier(), primaryShard);
+                future = promise.future();
             }
 
-            Object serializedCreateMessage = new CreateTransaction(getIdentifier().toString(),
-                    TransactionProxy.this.transactionType.ordinal(),
-                    getTransactionChainId()).toSerializable();
-
-            Future<Object> createTxFuture = actorContext.executeOperationAsync(primaryShard, serializedCreateMessage);
-
-            createTxFuture.onComplete(this, actorContext.getClientDispatcher());
+            cohortFutures.add(future);
         }
 
-        @Override
-        public void onComplete(Throwable failure, Object response) {
-            if(failure instanceof NoShardLeaderException) {
-                // There's no leader for the shard yet - schedule and try again, unless we're out
-                // of retries. Note: createTxTries is volatile as it may be written by different
-                // threads however not concurrently, therefore decrementing it non-atomically here
-                // is ok.
-                if(--createTxTries > 0) {
-                    LOG.debug("Tx {} Shard {} has no leader yet - scheduling create Tx retry",
-                        getIdentifier(), shardName);
-
-                    actorContext.getActorSystem().scheduler().scheduleOnce(CREATE_TX_TRY_INTERVAL,
-                            new Runnable() {
-                                @Override
-                                public void run() {
-                                    tryCreateTransaction();
-                                }
-                            }, actorContext.getClientDispatcher());
-                    return;
-                }
-            }
-
-            createTransactionContext(failure, response);
-        }
+        return new ThreePhaseCommitCohortProxy(txContextFactory.getActorContext(), cohortFutures, getIdentifier().toString());
+    }
 
-        private void createTransactionContext(Throwable failure, Object response) {
-            // Mainly checking for state violation here to perform a volatile read of "initialized" to
-            // ensure updates to operationLimter et al are visible to this thread (ie we're doing
-            // "piggy-back" synchronization here).
-            Preconditions.checkState(initialized, "Tx was not propertly initialized.");
-
-            // Create the TransactionContext from the response or failure. Store the new
-            // TransactionContext locally until we've completed invoking the
-            // TransactionOperations. This avoids thread timing issues which could cause
-            // out-of-order TransactionOperations. Eg, on a modification operation, if the
-            // TransactionContext is non-null, then we directly call the TransactionContext.
-            // However, at the same time, the code may be executing the cached
-            // TransactionOperations. So to avoid thus timing, we don't publish the
-            // TransactionContext until after we've executed all cached TransactionOperations.
-            TransactionContext localTransactionContext;
-            if(failure != null) {
-                LOG.debug("Tx {} Creating NoOpTransaction because of error", getIdentifier(), failure);
-
-                localTransactionContext = new NoOpTransactionContext(failure, getIdentifier(), operationLimiter);
-            } else if (CreateTransactionReply.SERIALIZABLE_CLASS.equals(response.getClass())) {
-                localTransactionContext = createValidTransactionContext(
-                        CreateTransactionReply.fromSerializable(response));
-            } else {
-                IllegalArgumentException exception = new IllegalArgumentException(String.format(
-                        "Invalid reply type %s for CreateTransaction", response.getClass()));
+    private static String shardNameFromIdentifier(final YangInstanceIdentifier path) {
+        return ShardStrategyFactory.getStrategy(path).findShard(path);
+    }
 
-                localTransactionContext = new NoOpTransactionContext(exception, getIdentifier(), operationLimiter);
-            }
+    private TransactionContextWrapper getContextAdapter(final YangInstanceIdentifier path) {
+        return getContextAdapter(shardNameFromIdentifier(path));
+    }
 
-            executeTxOperatonsOnComplete(localTransactionContext);
+    private TransactionContextWrapper getContextAdapter(final String shardName) {
+        final TransactionContextWrapper existing = txContextAdapters.get(shardName);
+        if (existing != null) {
+            return existing;
         }
 
-        private void executeTxOperatonsOnComplete(TransactionContext localTransactionContext) {
-            while(true) {
-                // Access to txOperationsOnComplete and transactionContext must be protected and atomic
-                // (ie synchronized) with respect to #addTxOperationOnComplete to handle timing
-                // issues and ensure no TransactionOperation is missed and that they are processed
-                // in the order they occurred.
-
-                // We'll make a local copy of the txOperationsOnComplete list to handle re-entrancy
-                // in case a TransactionOperation results in another transaction operation being
-                // queued (eg a put operation from a client read Future callback that is notified
-                // synchronously).
-                Collection<TransactionOperation> operationsBatch = null;
-                synchronized(txOperationsOnComplete) {
-                    if(txOperationsOnComplete.isEmpty()) {
-                        // We're done invoking the TransactionOperations so we can now publish the
-                        // TransactionContext.
-                        transactionContext = localTransactionContext;
-                        break;
-                    }
+        final TransactionContextWrapper fresh = txContextFactory.newTransactionAdapter(this, shardName);
+        txContextAdapters.put(shardName, fresh);
+        return fresh;
+    }
 
-                    operationsBatch = new ArrayList<>(txOperationsOnComplete);
-                    txOperationsOnComplete.clear();
-                }
+    TransactionType getType() {
+        return type;
+    }
 
-                // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking.
-                // A slight down-side is that we need to re-acquire the lock below but this should
-                // be negligible.
-                for(TransactionOperation oper: operationsBatch) {
-                    oper.invoke(localTransactionContext);
-                }
-            }
-        }
+    boolean isReady() {
+        return state != TransactionState.OPEN;
+    }
 
-        private TransactionContext createValidTransactionContext(CreateTransactionReply reply) {
-            LOG.debug("Tx {} Received {}", getIdentifier(), reply);
+    ActorContext getActorContext() {
+        return txContextFactory.getActorContext();
+    }
 
-            return createValidTransactionContext(actorContext.actorSelection(reply.getTransactionPath()),
-                    reply.getTransactionPath(), reply.getVersion());
+    OperationCompleter getCompleter() {
+        OperationCompleter ret = operationCompleter;
+        if (ret == null) {
+            final Semaphore s = getLimiter();
+            ret = new OperationCompleter(s);
+            operationCompleter = ret;
         }
 
-        private TransactionContext createValidTransactionContext(ActorSelection transactionActor,
-                String transactionPath, short remoteTransactionVersion) {
-
-            if (transactionType == TransactionType.READ_ONLY) {
-                // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference
-                // to close the remote Tx's when this instance is no longer in use and is garbage
-                // collected.
-
-                if(remoteTransactionActorsMB == null) {
-                    remoteTransactionActors = Lists.newArrayList();
-                    remoteTransactionActorsMB = new AtomicBoolean();
+        return ret;
+    }
 
-                    TransactionProxyCleanupPhantomReference cleanup =
-                            new TransactionProxyCleanupPhantomReference(TransactionProxy.this);
-                    phantomReferenceCache.put(cleanup, cleanup);
-                }
+    Semaphore getLimiter() {
+        Semaphore ret = operationLimiter;
+        if (ret == null) {
+            // Note : Currently mailbox-capacity comes from akka.conf and not from the config-subsystem
+            ret = new Semaphore(getActorContext().getTransactionOutstandingOperationLimit());
+            operationLimiter = ret;
+        }
+        return ret;
+    }
 
-                // Add the actor to the remoteTransactionActors list for access by the
-                // cleanup PhantonReference.
-                remoteTransactionActors.add(transactionActor);
+    void throttleOperation() {
+        throttleOperation(1);
+    }
 
-                // Write to the memory barrier volatile to publish the above update to the
-                // remoteTransactionActors list for thread visibility.
-                remoteTransactionActorsMB.set(true);
+    private void throttleOperation(int acquirePermits) {
+        try {
+            if (!getLimiter().tryAcquire(acquirePermits,
+                getActorContext().getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){
+                LOG.warn("Failed to acquire operation permit for transaction {}", getIdentifier());
             }
-
-            // TxActor is always created where the leader of the shard is.
-            // Check if TxActor is created in the same node
-            boolean isTxActorLocal = actorContext.isPathLocal(transactionPath);
-
-            if(remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) {
-                return new PreLithiumTransactionContextImpl(transactionPath, transactionActor, getIdentifier(),
-                        transactionChainId, actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion,
-                        operationCompleter);
-            } else if (transactionType == TransactionType.WRITE_ONLY &&
-                    actorContext.getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
-                return new WriteOnlyTransactionContextImpl(transactionActor, getIdentifier(), transactionChainId,
-                    actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter);
+        } catch (InterruptedException e) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Interrupted when trying to acquire operation permit for transaction {}", getIdentifier(), e);
             } else {
-                return new TransactionContextImpl(transactionActor, getIdentifier(), transactionChainId,
-                        actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter);
+                LOG.warn("Interrupted when trying to acquire operation permit for transaction {}", getIdentifier());
             }
         }
     }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.