Move operation limiter down to TransactionContextWrapper 47/22147/3
authorRobert Varga <rovarga@cisco.com>
Tue, 19 May 2015 20:04:10 +0000 (22:04 +0200)
committerGerrit Code Review <gerrit@opendaylight.org>
Tue, 9 Jun 2015 12:49:35 +0000 (12:49 +0000)
The limiter tracks the number of operations invoked on the shard leader,
which does not correspond to the number of operations executed on the
frontend.

The appropriate entity to decide what is throttled how is the
TransactionContext, which unfortunately may not exist. We will solve
this problem by making TransactionContextWrapper perform pessimistic
limiting as long as the context does not exist. Once the context is
materialized, the outstanding operation queue is handed off to it and
the context becomes the entity managing the limits.

This patch has the side-effect that committing a transaction requires
number of permits equal to the number of shards it touches. It also
ensures that readAll() is properly throttled.

Change-Id: If91816d806bbb3895592e1f42b0b8e389443d0f7
Signed-off-by: Robert Varga <rovarga@cisco.com>
(cherry picked from commit 9e7a9b3725ad25f9adf85f0ad796b7cf748795a4)

12 files changed:
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalThreePhaseCommitCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpTransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionContextImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContextTest.java

index df478b0..571899b 100644 (file)
@@ -7,31 +7,78 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import com.google.common.base.Preconditions;
+import javax.annotation.Nonnull;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 abstract class AbstractTransactionContext implements TransactionContext {
-
     private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionContext.class);
-
+    private final OperationLimiter limiter;
     private long modificationCount = 0;
+    private boolean handoffComplete;
+
+    protected AbstractTransactionContext(final OperationLimiter limiter) {
+        this.limiter = Preconditions.checkNotNull(limiter);
+    }
 
-    private final TransactionIdentifier identifier;
+    /**
+     * Get the transaction identifier associated with this context.
+     *
+     * @return Transaction identifier.
+     */
+    @Nonnull protected final TransactionIdentifier getIdentifier() {
+        return limiter.getIdentifier();
+    }
 
-    protected AbstractTransactionContext(TransactionIdentifier identifier) {
-        this.identifier = identifier;
+    /**
+     * Return the operation limiter associated with this context.
+     * @return Operation limiter.
+     */
+    @Nonnull protected final OperationLimiter getLimiter() {
+        return limiter;
     }
 
-    protected final TransactionIdentifier getIdentifier() {
-        return identifier;
+    /**
+     * Indicate whether all operations have been handed off by the {@link TransactionContextWrapper}.
+     *
+     * @return True if this context is responsible for throttling.
+     */
+    protected final boolean isOperationHandoffComplete() {
+        return handoffComplete;
     }
 
-    protected void incrementModificationCount(){
+    /**
+     * Acquire operation from the limiter if the handoff has completed. If
+     * the handoff is still ongoing, this method does nothing.
+     */
+    protected final void acquireOperation() {
+        if (handoffComplete) {
+            limiter.acquire();
+        }
+    }
+
+    /**
+     * Acquire operation from the limiter if the handoff has NOT completed. If
+     * the handoff has completed, this method does nothing.
+     */
+    protected final void releaseOperation() {
+        if (!handoffComplete) {
+            limiter.release();
+        }
+    }
+
+    protected final void incrementModificationCount() {
         modificationCount++;
     }
 
-    protected void logModificationCount(){
-        LOG.debug("Total modifications on Tx {} = [ {} ]", identifier, modificationCount);
+    protected final void logModificationCount() {
+        LOG.debug("Total modifications on Tx {} = [ {} ]", getIdentifier(), modificationCount);
+    }
+
+    @Override
+    public final void operationHandoffComplete() {
+        handoffComplete = true;
     }
-}
\ No newline at end of file
+}
index c90a3f6..4dff391 100644 (file)
@@ -85,11 +85,11 @@ abstract class AbstractTransactionContextFactory<F extends LocalTransactionFacto
         LOG.debug("Tx {}: Find primary for shard {} failed", parent.getIdentifier(), shardName, failure);
 
         transactionContextAdapter.executePriorTransactionOperations(new NoOpTransactionContext(failure,
-                parent.getIdentifier(), parent.getLimiter()));
+                parent.getLimiter()));
     }
 
     final TransactionContextWrapper newTransactionAdapter(final TransactionProxy parent, final String shardName) {
-        final TransactionContextWrapper transactionContextAdapter = new TransactionContextWrapper(parent.getIdentifier());
+        final TransactionContextWrapper transactionContextAdapter = new TransactionContextWrapper(parent.getLimiter());
 
         Future<PrimaryShardInfo> findPrimaryFuture = findPrimaryShard(shardName);
         if(findPrimaryFuture.isCompleted()) {
@@ -178,7 +178,7 @@ abstract class AbstractTransactionContextFactory<F extends LocalTransactionFacto
         switch(parent.getType()) {
             case READ_ONLY:
                 final DOMStoreReadTransaction readOnly = factory.newReadOnlyTransaction(parent.getIdentifier());
-                return new LocalTransactionContext(parent.getIdentifier(), readOnly, parent.getLimiter()) {
+                return new LocalTransactionContext(readOnly, parent.getLimiter()) {
                     @Override
                     protected DOMStoreWriteTransaction getWriteDelegate() {
                         throw new UnsupportedOperationException();
@@ -191,7 +191,7 @@ abstract class AbstractTransactionContextFactory<F extends LocalTransactionFacto
                 };
             case READ_WRITE:
                 final DOMStoreReadWriteTransaction readWrite = factory.newReadWriteTransaction(parent.getIdentifier());
-                return new LocalTransactionContext(parent.getIdentifier(), readWrite, parent.getLimiter()) {
+                return new LocalTransactionContext(readWrite, parent.getLimiter()) {
                     @Override
                     protected DOMStoreWriteTransaction getWriteDelegate() {
                         return readWrite;
@@ -204,7 +204,7 @@ abstract class AbstractTransactionContextFactory<F extends LocalTransactionFacto
                 };
             case WRITE_ONLY:
                 final DOMStoreWriteTransaction writeOnly = factory.newWriteOnlyTransaction(parent.getIdentifier());
-                return new LocalTransactionContext(parent.getIdentifier(), writeOnly, parent.getLimiter()) {
+                return new LocalTransactionContext(writeOnly, parent.getLimiter()) {
                     @Override
                     protected DOMStoreWriteTransaction getWriteDelegate() {
                         return writeOnly;
index 267513f..4e08539 100644 (file)
@@ -11,6 +11,7 @@ import akka.actor.ActorSelection;
 import akka.dispatch.OnComplete;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.ListenableFuture;
+import javax.annotation.Nonnull;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
@@ -51,6 +52,15 @@ abstract class LocalThreePhaseCommitCohort implements DOMStoreThreePhaseCommitCo
         return actorContext.executeOperationAsync(leader, message, actorContext.getTransactionCommitOperationTimeout());
     }
 
+    /**
+     * Return the {@link ActorContext} associated with this object.
+     *
+     * @return An actor context instance.
+     */
+    @Nonnull ActorContext getActorContext() {
+        return actorContext;
+    }
+
     Future<ActorSelection> initiateCoordinatedCommit() {
         final Future<Object> messageFuture = initiateCommit(false);
         final Future<ActorSelection> ret = TransactionReadyReplyMapper.transform(messageFuture, actorContext,
index dd7d899..9b0accd 100644 (file)
@@ -13,7 +13,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.SettableFuture;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
@@ -28,14 +28,11 @@ import scala.concurrent.Future;
  * @author Thomas Pantelis
  */
 abstract class LocalTransactionContext extends AbstractTransactionContext {
-
     private final DOMStoreTransaction txDelegate;
-    private final OperationLimiter limiter;
 
-    LocalTransactionContext(TransactionIdentifier identifier, DOMStoreTransaction txDelegate, OperationLimiter limiter) {
-        super(identifier);
+    LocalTransactionContext(DOMStoreTransaction txDelegate, OperationLimiter limiter) {
+        super(limiter);
         this.txDelegate = Preconditions.checkNotNull(txDelegate);
-        this.limiter = Preconditions.checkNotNull(limiter);
     }
 
     protected abstract DOMStoreWriteTransaction getWriteDelegate();
@@ -46,36 +43,36 @@ abstract class LocalTransactionContext extends AbstractTransactionContext {
     public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
         incrementModificationCount();
         getWriteDelegate().write(path, data);
-        limiter.release();
+        releaseOperation();
     }
 
     @Override
     public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
         incrementModificationCount();
         getWriteDelegate().merge(path, data);
-        limiter.release();
+        releaseOperation();
     }
 
     @Override
     public void deleteData(YangInstanceIdentifier path) {
         incrementModificationCount();
         getWriteDelegate().delete(path);
-        limiter.release();
+        releaseOperation();
     }
 
     @Override
     public void readData(YangInstanceIdentifier path, final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture) {
         Futures.addCallback(getReadDelegate().read(path), new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
             @Override
-            public void onSuccess(Optional<NormalizedNode<?, ?>> result) {
+            public void onSuccess(final Optional<NormalizedNode<?, ?>> result) {
                 proxyFuture.set(result);
-                limiter.release();
+                releaseOperation();
             }
 
             @Override
-            public void onFailure(Throwable t) {
+            public void onFailure(final Throwable t) {
                 proxyFuture.setException(t);
-                limiter.release();
+                releaseOperation();
             }
         });
     }
@@ -84,34 +81,41 @@ abstract class LocalTransactionContext extends AbstractTransactionContext {
     public void dataExists(YangInstanceIdentifier path, final SettableFuture<Boolean> proxyFuture) {
         Futures.addCallback(getReadDelegate().exists(path), new FutureCallback<Boolean>() {
             @Override
-            public void onSuccess(Boolean result) {
+            public void onSuccess(final Boolean result) {
                 proxyFuture.set(result);
-                limiter.release();
+                releaseOperation();
             }
 
             @Override
-            public void onFailure(Throwable t) {
+            public void onFailure(final Throwable t) {
                 proxyFuture.setException(t);
-                limiter.release();
+                releaseOperation();
             }
         });
     }
 
     private LocalThreePhaseCommitCohort ready() {
         logModificationCount();
-        LocalThreePhaseCommitCohort ready = (LocalThreePhaseCommitCohort) getWriteDelegate().ready();
-        limiter.release();
-        return ready;
+        acquireOperation();
+        return (LocalThreePhaseCommitCohort) getWriteDelegate().ready();
+    }
+
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    private <T extends Future> T completeOperation(final ActorContext actorContext, final T operationFuture) {
+        operationFuture.onComplete(getLimiter(), actorContext.getClientDispatcher());
+        return operationFuture;
     }
 
     @Override
     public Future<ActorSelection> readyTransaction() {
-        return ready().initiateCoordinatedCommit();
+        final LocalThreePhaseCommitCohort cohort = ready();
+        return completeOperation(cohort.getActorContext(), cohort.initiateCoordinatedCommit());
     }
 
     @Override
     public Future<Object> directCommit() {
-        return ready().initiateDirectCommit();
+        final LocalThreePhaseCommitCohort cohort = ready();
+        return completeOperation(cohort.getActorContext(), cohort.initiateDirectCommit());
     }
 
     @Override
@@ -122,5 +126,6 @@ abstract class LocalTransactionContext extends AbstractTransactionContext {
     @Override
     public void closeTransaction() {
         txDelegate.close();
+        releaseOperation();
     }
 }
index ff485cb..a142142 100644 (file)
@@ -11,7 +11,6 @@ import akka.actor.ActorSelection;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.SettableFuture;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.md.sal.common.api.data.DataStoreUnavailableException;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -24,12 +23,10 @@ final class NoOpTransactionContext extends AbstractTransactionContext {
     private static final Logger LOG = LoggerFactory.getLogger(NoOpTransactionContext.class);
 
     private final Throwable failure;
-    private final OperationLimiter operationLimiter;
 
-    public NoOpTransactionContext(Throwable failure, TransactionIdentifier identifier, OperationLimiter operationLimiter) {
-        super(identifier);
+    public NoOpTransactionContext(Throwable failure, OperationLimiter limiter) {
+        super(limiter);
         this.failure = failure;
-        this.operationLimiter = operationLimiter;
     }
 
     @Override
@@ -45,41 +42,42 @@ final class NoOpTransactionContext extends AbstractTransactionContext {
     @Override
     public Future<Object> directCommit() {
         LOG.debug("Tx {} directCommit called, failure: {}", getIdentifier(), failure);
-        operationLimiter.release();
+        releaseOperation();
         return akka.dispatch.Futures.failed(failure);
     }
 
     @Override
     public Future<ActorSelection> readyTransaction() {
         LOG.debug("Tx {} readyTransaction called, failure: {}", getIdentifier(), failure);
-        operationLimiter.release();
+        releaseOperation();
         return akka.dispatch.Futures.failed(failure);
     }
 
     @Override
     public void deleteData(YangInstanceIdentifier path) {
         LOG.debug("Tx {} deleteData called path = {}", getIdentifier(), path);
-        operationLimiter.release();
+        releaseOperation();
     }
 
     @Override
     public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
         LOG.debug("Tx {} mergeData called path = {}", getIdentifier(), path);
-        operationLimiter.release();
+        releaseOperation();
     }
 
     @Override
     public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
         LOG.debug("Tx {} writeData called path = {}", getIdentifier(), path);
-        operationLimiter.release();
+        releaseOperation();
     }
 
     @Override
     public void readData(final YangInstanceIdentifier path, SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture) {
         LOG.debug("Tx {} readData called path = {}", getIdentifier(), path);
-        operationLimiter.release();
-        Throwable t;
-        if(failure instanceof NoShardLeaderException) {
+        releaseOperation();
+
+        final Throwable t;
+        if (failure instanceof NoShardLeaderException) {
             t = new DataStoreUnavailableException(failure.getMessage(), failure);
         } else {
             t = failure;
@@ -90,7 +88,7 @@ final class NoOpTransactionContext extends AbstractTransactionContext {
     @Override
     public void dataExists(YangInstanceIdentifier path, SettableFuture<Boolean> proxyFuture) {
         LOG.debug("Tx {} dataExists called path = {}", getIdentifier(), path);
-        operationLimiter.release();
+        releaseOperation();
         proxyFuture.setException(new ReadFailedException("Error checking exists for path " + path, failure));
     }
 }
index 6bf0f7f..7e8a2a0 100644 (file)
@@ -12,7 +12,6 @@ import akka.actor.ActorSelection;
 import akka.dispatch.OnComplete;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.SettableFuture;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
@@ -46,27 +45,24 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
     private final boolean isTxActorLocal;
     private final short remoteTransactionVersion;
 
-    private final OperationLimiter operationCompleter;
     private BatchedModifications batchedModifications;
     private int totalBatchedModificationsSent;
 
-    protected RemoteTransactionContext(ActorSelection actor, TransactionIdentifier identifier,
+    protected RemoteTransactionContext(ActorSelection actor,
             ActorContext actorContext, boolean isTxActorLocal,
             short remoteTransactionVersion, OperationLimiter limiter) {
-        super(identifier);
+        super(limiter);
         this.actor = actor;
         this.actorContext = actorContext;
         this.isTxActorLocal = isTxActorLocal;
         this.remoteTransactionVersion = remoteTransactionVersion;
-        this.operationCompleter = limiter;
     }
 
     private Future<Object> completeOperation(Future<Object> operationFuture){
-        operationFuture.onComplete(this.operationCompleter, actorContext.getClientDispatcher());
+        operationFuture.onComplete(getLimiter(), actorContext.getClientDispatcher());
         return operationFuture;
     }
 
-
     private ActorSelection getActor() {
         return actor;
     }
@@ -178,6 +174,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
     public void deleteData(YangInstanceIdentifier path) {
         LOG.debug("Tx {} deleteData called path = {}", getIdentifier(), path);
 
+        acquireOperation();
         batchModification(new DeleteModification(path));
     }
 
@@ -185,6 +182,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
     public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
         LOG.debug("Tx {} mergeData called path = {}", getIdentifier(), path);
 
+        acquireOperation();
         batchModification(new MergeModification(path, data));
     }
 
@@ -192,6 +190,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
     public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
         LOG.debug("Tx {} writeData called path = {}", getIdentifier(), path);
 
+        acquireOperation();
         batchModification(new WriteModification(path, data));
     }
 
@@ -204,6 +203,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
         // Send any batched modifications. This is necessary to honor the read uncommitted semantics of the
         // public API contract.
 
+        acquireOperation();
         sendBatchedModifications();
 
         OnComplete<Object> onComplete = new OnComplete<Object>() {
@@ -246,6 +246,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
         // Send any batched modifications. This is necessary to honor the read uncommitted semantics of the
         // public API contract.
 
+        acquireOperation();
         sendBatchedModifications();
 
         OnComplete<Object> onComplete = new OnComplete<Object>() {
index 9cb062d..984d650 100644 (file)
@@ -161,7 +161,7 @@ final class RemoteTransactionContextSupport {
         if(failure != null) {
             LOG.debug("Tx {} Creating NoOpTransaction because of error", getIdentifier(), failure);
 
-            localTransactionContext = new NoOpTransactionContext(failure, getIdentifier(), getOperationLimiter());
+            localTransactionContext = new NoOpTransactionContext(failure, getOperationLimiter());
         } else if (CreateTransactionReply.SERIALIZABLE_CLASS.equals(response.getClass())) {
             localTransactionContext = createValidTransactionContext(
                     CreateTransactionReply.fromSerializable(response));
@@ -169,7 +169,7 @@ final class RemoteTransactionContextSupport {
             IllegalArgumentException exception = new IllegalArgumentException(String.format(
                     "Invalid reply type %s for CreateTransaction", response.getClass()));
 
-            localTransactionContext = new NoOpTransactionContext(exception, getIdentifier(), getOperationLimiter());
+            localTransactionContext = new NoOpTransactionContext(exception, getOperationLimiter());
         }
 
         transactionContextAdapter.executePriorTransactionOperations(localTransactionContext);
@@ -190,10 +190,10 @@ final class RemoteTransactionContextSupport {
         final TransactionContext ret;
 
         if (remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) {
-            ret = new PreLithiumTransactionContextImpl(transactionPath, transactionActor, getIdentifier(),
+            ret = new PreLithiumTransactionContextImpl(transactionPath, transactionActor,
                 getActorContext(), isTxActorLocal, remoteTransactionVersion, parent.getLimiter());
         } else {
-            ret = new RemoteTransactionContext(transactionActor, getIdentifier(), getActorContext(),
+            ret = new RemoteTransactionContext(transactionActor, getActorContext(),
                 isTxActorLocal, remoteTransactionVersion, parent.getLimiter());
         }
 
index 4eea785..e5130ed 100644 (file)
@@ -36,4 +36,14 @@ interface TransactionContext {
     boolean supportsDirectCommit();
 
     Future<Object> directCommit();
+
+    /**
+     * Invoked by {@link TransactionContextWrapper} when it has finished handing
+     * off operations to this context. From this point on, the context is responsible
+     * for throttling operations.
+     *
+     * Implementations can rely on the wrapper calling this operation in a synchronized
+     * block, so they do not need to ensure visibility of this state transition themselves.
+     */
+    void operationHandoffComplete();
 }
index 26d7ff8..b08d419 100644 (file)
@@ -7,6 +7,9 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import akka.actor.ActorSelection;
+import akka.dispatch.Futures;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -15,6 +18,8 @@ import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+import scala.concurrent.Promise;
 
 /**
  * A helper class that wraps an eventual TransactionContext instance. Operations destined for the target
@@ -37,10 +42,10 @@ class TransactionContextWrapper {
      */
     private volatile TransactionContext transactionContext;
 
-    private final TransactionIdentifier identifier;
+    private final OperationLimiter limiter;
 
-    TransactionContextWrapper(final TransactionIdentifier identifier) {
-        this.identifier = identifier;
+    TransactionContextWrapper(final OperationLimiter limiter) {
+        this.limiter = Preconditions.checkNotNull(limiter);
     }
 
     TransactionContext getTransactionContext() {
@@ -48,7 +53,7 @@ class TransactionContextWrapper {
     }
 
     TransactionIdentifier getIdentifier() {
-        return identifier;
+        return limiter.getIdentifier();
     }
 
     /**
@@ -69,6 +74,8 @@ class TransactionContextWrapper {
 
         if (invokeOperation) {
             operation.invoke(transactionContext);
+        } else {
+            limiter.acquire();
         }
     }
 
@@ -95,10 +102,11 @@ class TransactionContextWrapper {
             // queued (eg a put operation from a client read Future callback that is notified
             // synchronously).
             Collection<TransactionOperation> operationsBatch = null;
-            synchronized(queuedTxOperations) {
-                if(queuedTxOperations.isEmpty()) {
+            synchronized (queuedTxOperations) {
+                if (queuedTxOperations.isEmpty()) {
                     // We're done invoking the TransactionOperations so we can now publish the
                     // TransactionContext.
+                    localTransactionContext.operationHandoffComplete();
                     transactionContext = localTransactionContext;
                     break;
                 }
@@ -110,9 +118,26 @@ class TransactionContextWrapper {
             // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking.
             // A slight down-side is that we need to re-acquire the lock below but this should
             // be negligible.
-            for(TransactionOperation oper: operationsBatch) {
+            for (TransactionOperation oper : operationsBatch) {
                 oper.invoke(localTransactionContext);
             }
         }
     }
+
+    Future<ActorSelection> readyTransaction() {
+        // avoid the creation of a promise and a TransactionOperation
+        if (transactionContext != null) {
+            return transactionContext.readyTransaction();
+        }
+
+        final Promise<ActorSelection> promise = Futures.promise();
+        enqueueTransactionOperation(new TransactionOperation() {
+            @Override
+            public void invoke(TransactionContext transactionContext) {
+                promise.completeWith(transactionContext.readyTransaction());
+            }
+        });
+
+        return promise.future();
+    }
 }
index 5aafcfc..1bda781 100644 (file)
@@ -78,8 +78,6 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
 
         LOG.debug("Tx {} exists {}", getIdentifier(), path);
 
-        limiter.acquire();
-
         final SettableFuture<Boolean> proxyFuture = SettableFuture.create();
         TransactionContextWrapper contextAdapter = getContextAdapter(path);
         contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
@@ -101,8 +99,6 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         if (YangInstanceIdentifier.EMPTY.equals(path)) {
             return readAllData();
         } else {
-            limiter.acquire();
-
             return singleShardRead(shardNameFromIdentifier(path), path);
         }
     }
@@ -152,8 +148,6 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
 
         LOG.debug("Tx {} delete {}", getIdentifier(), path);
 
-        limiter.acquire();
-
         TransactionContextWrapper contextAdapter = getContextAdapter(path);
         contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
             @Override
@@ -169,8 +163,6 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
 
         LOG.debug("Tx {} merge {}", getIdentifier(), path);
 
-        limiter.acquire();
-
         TransactionContextWrapper contextAdapter = getContextAdapter(path);
         contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
             @Override
@@ -186,8 +178,6 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
 
         LOG.debug("Tx {} write {}", getIdentifier(), path);
 
-        limiter.acquire();
-
         TransactionContextWrapper contextAdapter = getContextAdapter(path);
         contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
             @Override
@@ -265,7 +255,6 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
 
     private AbstractThreePhaseCommitCohort<?> createSingleCommitCohort(final String shardName,
             final TransactionContextWrapper contextAdapter) {
-        limiter.acquire();
 
         LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), shardName);
 
@@ -308,30 +297,11 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
     private AbstractThreePhaseCommitCohort<ActorSelection> createMultiCommitCohort(
             final Set<Entry<String, TransactionContextWrapper>> txContextAdapterEntries) {
 
-        limiter.acquire();
         final List<Future<ActorSelection>> cohortFutures = new ArrayList<>(txContextAdapterEntries.size());
         for (Entry<String, TransactionContextWrapper> e : txContextAdapterEntries) {
             LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), e.getKey());
 
-            TransactionContextWrapper contextAdapter = e.getValue();
-            final TransactionContext transactionContext = contextAdapter.getTransactionContext();
-            Future<ActorSelection> future;
-            if (transactionContext != null) {
-                // avoid the creation of a promise and a TransactionOperation
-                future = transactionContext.readyTransaction();
-            } else {
-                final Promise<ActorSelection> promise = akka.dispatch.Futures.promise();
-                contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
-                    @Override
-                    public void invoke(TransactionContext transactionContext) {
-                        promise.completeWith(transactionContext.readyTransaction());
-                    }
-                });
-
-                future = promise.future();
-            }
-
-            cohortFutures.add(future);
+            cohortFutures.add(e.getValue().readyTransaction());
         }
 
         return new ThreePhaseCommitCohortProxy(txContextFactory.getActorContext(), cohortFutures, getIdentifier().toString());
index 4de8ab7..733543a 100644 (file)
@@ -11,7 +11,6 @@ import akka.actor.ActorSelection;
 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.cluster.datastore.OperationLimiter;
 import org.opendaylight.controller.cluster.datastore.RemoteTransactionContext;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
@@ -35,10 +34,10 @@ public class PreLithiumTransactionContextImpl extends RemoteTransactionContext {
 
     private final String transactionPath;
 
-    public PreLithiumTransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier,
+    public PreLithiumTransactionContextImpl(String transactionPath, ActorSelection actor,
             ActorContext actorContext, boolean isTxActorLocal,
             short remoteTransactionVersion, OperationLimiter limiter) {
-        super(actor, identifier, actorContext, isTxActorLocal, remoteTransactionVersion, limiter);
+        super(actor, actorContext, isTxActorLocal, remoteTransactionVersion, limiter);
         this.transactionPath = transactionPath;
     }
 
index d8f74dd..b04612e 100644 (file)
@@ -3,16 +3,19 @@ package org.opendaylight.controller.cluster.datastore;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
+import akka.dispatch.ExecutionContexts;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -31,9 +34,9 @@ public class LocalTransactionContextTest {
     LocalTransactionContext localTransactionContext;
 
     @Before
-    public void setUp(){
+    public void setUp() {
         MockitoAnnotations.initMocks(this);
-        localTransactionContext = new LocalTransactionContext(identifier, readWriteTransaction, limiter) {
+        localTransactionContext = new LocalTransactionContext(readWriteTransaction, limiter) {
             @Override
             protected DOMStoreWriteTransaction getWriteDelegate() {
                 return readWriteTransaction;
@@ -47,7 +50,7 @@ public class LocalTransactionContextTest {
     }
 
     @Test
-    public void testWrite(){
+    public void testWrite() {
         YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
         NormalizedNode<?, ?> normalizedNode = mock(NormalizedNode.class);
         localTransactionContext.writeData(yangInstanceIdentifier, normalizedNode);
@@ -56,7 +59,7 @@ public class LocalTransactionContextTest {
     }
 
     @Test
-    public void testMerge(){
+    public void testMerge() {
         YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
         NormalizedNode<?, ?> normalizedNode = mock(NormalizedNode.class);
         localTransactionContext.mergeData(yangInstanceIdentifier, normalizedNode);
@@ -65,7 +68,7 @@ public class LocalTransactionContextTest {
     }
 
     @Test
-    public void testDelete(){
+    public void testDelete() {
         YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
         localTransactionContext.deleteData(yangInstanceIdentifier);
         verify(limiter).release();
@@ -74,7 +77,7 @@ public class LocalTransactionContextTest {
 
 
     @Test
-    public void testRead(){
+    public void testRead() {
         YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
         NormalizedNode<?, ?> normalizedNode = mock(NormalizedNode.class);
         doReturn(Futures.immediateCheckedFuture(Optional.of(normalizedNode))).when(readWriteTransaction).read(yangInstanceIdentifier);
@@ -84,7 +87,7 @@ public class LocalTransactionContextTest {
     }
 
     @Test
-    public void testExists(){
+    public void testExists() {
         YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
         doReturn(Futures.immediateCheckedFuture(true)).when(readWriteTransaction).exists(yangInstanceIdentifier);
         localTransactionContext.dataExists(yangInstanceIdentifier, SettableFuture.<Boolean> create());
@@ -93,10 +96,15 @@ public class LocalTransactionContextTest {
     }
 
     @Test
-    public void testReady(){
-        doReturn(mock(LocalThreePhaseCommitCohort.class)).when(readWriteTransaction).ready();
+    public void testReady() {
+        final LocalThreePhaseCommitCohort mockCohort = mock(LocalThreePhaseCommitCohort.class);
+        final ActorContext mockContext = mock(ActorContext.class);
+        doReturn(mockContext).when(mockCohort).getActorContext();
+        doReturn(ExecutionContexts.fromExecutor(MoreExecutors.directExecutor())).when(mockContext).getClientDispatcher();
+        doReturn(akka.dispatch.Futures.successful(null)).when(mockCohort).initiateCoordinatedCommit();
+        doReturn(mockCohort).when(readWriteTransaction).ready();
         localTransactionContext.readyTransaction();
-        verify(limiter).release();
+        verify(limiter).onComplete(null, null);
         verify(readWriteTransaction).ready();
     }
 

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.