Move operation limiter down to TransactionContextWrapper 47/22147/3
authorRobert Varga <rovarga@cisco.com>
Tue, 19 May 2015 20:04:10 +0000 (22:04 +0200)
committerGerrit Code Review <gerrit@opendaylight.org>
Tue, 9 Jun 2015 12:49:35 +0000 (12:49 +0000)
The limiter tracks the number of operations invoked on the shard leader,
which does not correspond to the number of operations executed on the
frontend.

The appropriate entity to decide what is throttled how is the
TransactionContext, which unfortunately may not exist. We will solve
this problem by making TransactionContextWrapper perform pessimistic
limiting as long as the context does not exist. Once the context is
materialized, the outstanding operation queue is handed off to it and
the context becomes the entity managing the limits.

This patch has the side-effect that committing a transaction requires
number of permits equal to the number of shards it touches. It also
ensures that readAll() is properly throttled.

Change-Id: If91816d806bbb3895592e1f42b0b8e389443d0f7
Signed-off-by: Robert Varga <rovarga@cisco.com>
(cherry picked from commit 9e7a9b3725ad25f9adf85f0ad796b7cf748795a4)

12 files changed:
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalThreePhaseCommitCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpTransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionContextImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContextTest.java

index df478b063015c44eb2c96d304da2705d42bf785e..571899ba14343f7b8f189571e18f708b397fc86b 100644 (file)
@@ -7,31 +7,78 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import com.google.common.base.Preconditions;
+import javax.annotation.Nonnull;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 abstract class AbstractTransactionContext implements TransactionContext {
-
     private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionContext.class);
-
+    private final OperationLimiter limiter;
     private long modificationCount = 0;
+    private boolean handoffComplete;
+
+    protected AbstractTransactionContext(final OperationLimiter limiter) {
+        this.limiter = Preconditions.checkNotNull(limiter);
+    }
 
-    private final TransactionIdentifier identifier;
+    /**
+     * Get the transaction identifier associated with this context.
+     *
+     * @return Transaction identifier.
+     */
+    @Nonnull protected final TransactionIdentifier getIdentifier() {
+        return limiter.getIdentifier();
+    }
 
-    protected AbstractTransactionContext(TransactionIdentifier identifier) {
-        this.identifier = identifier;
+    /**
+     * Return the operation limiter associated with this context.
+     * @return Operation limiter.
+     */
+    @Nonnull protected final OperationLimiter getLimiter() {
+        return limiter;
     }
 
-    protected final TransactionIdentifier getIdentifier() {
-        return identifier;
+    /**
+     * Indicate whether all operations have been handed off by the {@link TransactionContextWrapper}.
+     *
+     * @return True if this context is responsible for throttling.
+     */
+    protected final boolean isOperationHandoffComplete() {
+        return handoffComplete;
     }
 
-    protected void incrementModificationCount(){
+    /**
+     * Acquire operation from the limiter if the handoff has completed. If
+     * the handoff is still ongoing, this method does nothing.
+     */
+    protected final void acquireOperation() {
+        if (handoffComplete) {
+            limiter.acquire();
+        }
+    }
+
+    /**
+     * Acquire operation from the limiter if the handoff has NOT completed. If
+     * the handoff has completed, this method does nothing.
+     */
+    protected final void releaseOperation() {
+        if (!handoffComplete) {
+            limiter.release();
+        }
+    }
+
+    protected final void incrementModificationCount() {
         modificationCount++;
     }
 
-    protected void logModificationCount(){
-        LOG.debug("Total modifications on Tx {} = [ {} ]", identifier, modificationCount);
+    protected final void logModificationCount() {
+        LOG.debug("Total modifications on Tx {} = [ {} ]", getIdentifier(), modificationCount);
+    }
+
+    @Override
+    public final void operationHandoffComplete() {
+        handoffComplete = true;
     }
-}
\ No newline at end of file
+}
index c90a3f6f6f70edb1f937efc701fad31086114959..4dff3915359aeda6cbaf933803c5f67279ad9468 100644 (file)
@@ -85,11 +85,11 @@ abstract class AbstractTransactionContextFactory<F extends LocalTransactionFacto
         LOG.debug("Tx {}: Find primary for shard {} failed", parent.getIdentifier(), shardName, failure);
 
         transactionContextAdapter.executePriorTransactionOperations(new NoOpTransactionContext(failure,
-                parent.getIdentifier(), parent.getLimiter()));
+                parent.getLimiter()));
     }
 
     final TransactionContextWrapper newTransactionAdapter(final TransactionProxy parent, final String shardName) {
-        final TransactionContextWrapper transactionContextAdapter = new TransactionContextWrapper(parent.getIdentifier());
+        final TransactionContextWrapper transactionContextAdapter = new TransactionContextWrapper(parent.getLimiter());
 
         Future<PrimaryShardInfo> findPrimaryFuture = findPrimaryShard(shardName);
         if(findPrimaryFuture.isCompleted()) {
@@ -178,7 +178,7 @@ abstract class AbstractTransactionContextFactory<F extends LocalTransactionFacto
         switch(parent.getType()) {
             case READ_ONLY:
                 final DOMStoreReadTransaction readOnly = factory.newReadOnlyTransaction(parent.getIdentifier());
-                return new LocalTransactionContext(parent.getIdentifier(), readOnly, parent.getLimiter()) {
+                return new LocalTransactionContext(readOnly, parent.getLimiter()) {
                     @Override
                     protected DOMStoreWriteTransaction getWriteDelegate() {
                         throw new UnsupportedOperationException();
@@ -191,7 +191,7 @@ abstract class AbstractTransactionContextFactory<F extends LocalTransactionFacto
                 };
             case READ_WRITE:
                 final DOMStoreReadWriteTransaction readWrite = factory.newReadWriteTransaction(parent.getIdentifier());
-                return new LocalTransactionContext(parent.getIdentifier(), readWrite, parent.getLimiter()) {
+                return new LocalTransactionContext(readWrite, parent.getLimiter()) {
                     @Override
                     protected DOMStoreWriteTransaction getWriteDelegate() {
                         return readWrite;
@@ -204,7 +204,7 @@ abstract class AbstractTransactionContextFactory<F extends LocalTransactionFacto
                 };
             case WRITE_ONLY:
                 final DOMStoreWriteTransaction writeOnly = factory.newWriteOnlyTransaction(parent.getIdentifier());
-                return new LocalTransactionContext(parent.getIdentifier(), writeOnly, parent.getLimiter()) {
+                return new LocalTransactionContext(writeOnly, parent.getLimiter()) {
                     @Override
                     protected DOMStoreWriteTransaction getWriteDelegate() {
                         return writeOnly;
index 267513fc2f1264943f82c5da11d2363dd18cbd5e..4e085399d2093d2bc84661e1d6f8679fd73f59a1 100644 (file)
@@ -11,6 +11,7 @@ import akka.actor.ActorSelection;
 import akka.dispatch.OnComplete;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.ListenableFuture;
+import javax.annotation.Nonnull;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
@@ -51,6 +52,15 @@ abstract class LocalThreePhaseCommitCohort implements DOMStoreThreePhaseCommitCo
         return actorContext.executeOperationAsync(leader, message, actorContext.getTransactionCommitOperationTimeout());
     }
 
+    /**
+     * Return the {@link ActorContext} associated with this object.
+     *
+     * @return An actor context instance.
+     */
+    @Nonnull ActorContext getActorContext() {
+        return actorContext;
+    }
+
     Future<ActorSelection> initiateCoordinatedCommit() {
         final Future<Object> messageFuture = initiateCommit(false);
         final Future<ActorSelection> ret = TransactionReadyReplyMapper.transform(messageFuture, actorContext,
index dd7d899e0cb10f809cc6a1b67fddc605896f1875..9b0accd455117f464d556fa8fc8c73f1990d0b23 100644 (file)
@@ -13,7 +13,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.SettableFuture;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
@@ -28,14 +28,11 @@ import scala.concurrent.Future;
  * @author Thomas Pantelis
  */
 abstract class LocalTransactionContext extends AbstractTransactionContext {
-
     private final DOMStoreTransaction txDelegate;
-    private final OperationLimiter limiter;
 
-    LocalTransactionContext(TransactionIdentifier identifier, DOMStoreTransaction txDelegate, OperationLimiter limiter) {
-        super(identifier);
+    LocalTransactionContext(DOMStoreTransaction txDelegate, OperationLimiter limiter) {
+        super(limiter);
         this.txDelegate = Preconditions.checkNotNull(txDelegate);
-        this.limiter = Preconditions.checkNotNull(limiter);
     }
 
     protected abstract DOMStoreWriteTransaction getWriteDelegate();
@@ -46,36 +43,36 @@ abstract class LocalTransactionContext extends AbstractTransactionContext {
     public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
         incrementModificationCount();
         getWriteDelegate().write(path, data);
-        limiter.release();
+        releaseOperation();
     }
 
     @Override
     public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
         incrementModificationCount();
         getWriteDelegate().merge(path, data);
-        limiter.release();
+        releaseOperation();
     }
 
     @Override
     public void deleteData(YangInstanceIdentifier path) {
         incrementModificationCount();
         getWriteDelegate().delete(path);
-        limiter.release();
+        releaseOperation();
     }
 
     @Override
     public void readData(YangInstanceIdentifier path, final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture) {
         Futures.addCallback(getReadDelegate().read(path), new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
             @Override
-            public void onSuccess(Optional<NormalizedNode<?, ?>> result) {
+            public void onSuccess(final Optional<NormalizedNode<?, ?>> result) {
                 proxyFuture.set(result);
-                limiter.release();
+                releaseOperation();
             }
 
             @Override
-            public void onFailure(Throwable t) {
+            public void onFailure(final Throwable t) {
                 proxyFuture.setException(t);
-                limiter.release();
+                releaseOperation();
             }
         });
     }
@@ -84,34 +81,41 @@ abstract class LocalTransactionContext extends AbstractTransactionContext {
     public void dataExists(YangInstanceIdentifier path, final SettableFuture<Boolean> proxyFuture) {
         Futures.addCallback(getReadDelegate().exists(path), new FutureCallback<Boolean>() {
             @Override
-            public void onSuccess(Boolean result) {
+            public void onSuccess(final Boolean result) {
                 proxyFuture.set(result);
-                limiter.release();
+                releaseOperation();
             }
 
             @Override
-            public void onFailure(Throwable t) {
+            public void onFailure(final Throwable t) {
                 proxyFuture.setException(t);
-                limiter.release();
+                releaseOperation();
             }
         });
     }
 
     private LocalThreePhaseCommitCohort ready() {
         logModificationCount();
-        LocalThreePhaseCommitCohort ready = (LocalThreePhaseCommitCohort) getWriteDelegate().ready();
-        limiter.release();
-        return ready;
+        acquireOperation();
+        return (LocalThreePhaseCommitCohort) getWriteDelegate().ready();
+    }
+
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    private <T extends Future> T completeOperation(final ActorContext actorContext, final T operationFuture) {
+        operationFuture.onComplete(getLimiter(), actorContext.getClientDispatcher());
+        return operationFuture;
     }
 
     @Override
     public Future<ActorSelection> readyTransaction() {
-        return ready().initiateCoordinatedCommit();
+        final LocalThreePhaseCommitCohort cohort = ready();
+        return completeOperation(cohort.getActorContext(), cohort.initiateCoordinatedCommit());
     }
 
     @Override
     public Future<Object> directCommit() {
-        return ready().initiateDirectCommit();
+        final LocalThreePhaseCommitCohort cohort = ready();
+        return completeOperation(cohort.getActorContext(), cohort.initiateDirectCommit());
     }
 
     @Override
@@ -122,5 +126,6 @@ abstract class LocalTransactionContext extends AbstractTransactionContext {
     @Override
     public void closeTransaction() {
         txDelegate.close();
+        releaseOperation();
     }
 }
index ff485cbab1046d4f96f15559ab10880638ea816b..a1421429405506d9c4ea1c0f0d4f848bc11c7b78 100644 (file)
@@ -11,7 +11,6 @@ import akka.actor.ActorSelection;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.SettableFuture;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.md.sal.common.api.data.DataStoreUnavailableException;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -24,12 +23,10 @@ final class NoOpTransactionContext extends AbstractTransactionContext {
     private static final Logger LOG = LoggerFactory.getLogger(NoOpTransactionContext.class);
 
     private final Throwable failure;
-    private final OperationLimiter operationLimiter;
 
-    public NoOpTransactionContext(Throwable failure, TransactionIdentifier identifier, OperationLimiter operationLimiter) {
-        super(identifier);
+    public NoOpTransactionContext(Throwable failure, OperationLimiter limiter) {
+        super(limiter);
         this.failure = failure;
-        this.operationLimiter = operationLimiter;
     }
 
     @Override
@@ -45,41 +42,42 @@ final class NoOpTransactionContext extends AbstractTransactionContext {
     @Override
     public Future<Object> directCommit() {
         LOG.debug("Tx {} directCommit called, failure: {}", getIdentifier(), failure);
-        operationLimiter.release();
+        releaseOperation();
         return akka.dispatch.Futures.failed(failure);
     }
 
     @Override
     public Future<ActorSelection> readyTransaction() {
         LOG.debug("Tx {} readyTransaction called, failure: {}", getIdentifier(), failure);
-        operationLimiter.release();
+        releaseOperation();
         return akka.dispatch.Futures.failed(failure);
     }
 
     @Override
     public void deleteData(YangInstanceIdentifier path) {
         LOG.debug("Tx {} deleteData called path = {}", getIdentifier(), path);
-        operationLimiter.release();
+        releaseOperation();
     }
 
     @Override
     public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
         LOG.debug("Tx {} mergeData called path = {}", getIdentifier(), path);
-        operationLimiter.release();
+        releaseOperation();
     }
 
     @Override
     public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
         LOG.debug("Tx {} writeData called path = {}", getIdentifier(), path);
-        operationLimiter.release();
+        releaseOperation();
     }
 
     @Override
     public void readData(final YangInstanceIdentifier path, SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture) {
         LOG.debug("Tx {} readData called path = {}", getIdentifier(), path);
-        operationLimiter.release();
-        Throwable t;
-        if(failure instanceof NoShardLeaderException) {
+        releaseOperation();
+
+        final Throwable t;
+        if (failure instanceof NoShardLeaderException) {
             t = new DataStoreUnavailableException(failure.getMessage(), failure);
         } else {
             t = failure;
@@ -90,7 +88,7 @@ final class NoOpTransactionContext extends AbstractTransactionContext {
     @Override
     public void dataExists(YangInstanceIdentifier path, SettableFuture<Boolean> proxyFuture) {
         LOG.debug("Tx {} dataExists called path = {}", getIdentifier(), path);
-        operationLimiter.release();
+        releaseOperation();
         proxyFuture.setException(new ReadFailedException("Error checking exists for path " + path, failure));
     }
 }
index 6bf0f7fc9c3e768697371bc0b216b5203c2cefaf..7e8a2a00ebe3fb701cb66f0991ba1a0fee9e2cf0 100644 (file)
@@ -12,7 +12,6 @@ import akka.actor.ActorSelection;
 import akka.dispatch.OnComplete;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.SettableFuture;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
@@ -46,27 +45,24 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
     private final boolean isTxActorLocal;
     private final short remoteTransactionVersion;
 
-    private final OperationLimiter operationCompleter;
     private BatchedModifications batchedModifications;
     private int totalBatchedModificationsSent;
 
-    protected RemoteTransactionContext(ActorSelection actor, TransactionIdentifier identifier,
+    protected RemoteTransactionContext(ActorSelection actor,
             ActorContext actorContext, boolean isTxActorLocal,
             short remoteTransactionVersion, OperationLimiter limiter) {
-        super(identifier);
+        super(limiter);
         this.actor = actor;
         this.actorContext = actorContext;
         this.isTxActorLocal = isTxActorLocal;
         this.remoteTransactionVersion = remoteTransactionVersion;
-        this.operationCompleter = limiter;
     }
 
     private Future<Object> completeOperation(Future<Object> operationFuture){
-        operationFuture.onComplete(this.operationCompleter, actorContext.getClientDispatcher());
+        operationFuture.onComplete(getLimiter(), actorContext.getClientDispatcher());
         return operationFuture;
     }
 
-
     private ActorSelection getActor() {
         return actor;
     }
@@ -178,6 +174,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
     public void deleteData(YangInstanceIdentifier path) {
         LOG.debug("Tx {} deleteData called path = {}", getIdentifier(), path);
 
+        acquireOperation();
         batchModification(new DeleteModification(path));
     }
 
@@ -185,6 +182,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
     public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
         LOG.debug("Tx {} mergeData called path = {}", getIdentifier(), path);
 
+        acquireOperation();
         batchModification(new MergeModification(path, data));
     }
 
@@ -192,6 +190,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
     public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
         LOG.debug("Tx {} writeData called path = {}", getIdentifier(), path);
 
+        acquireOperation();
         batchModification(new WriteModification(path, data));
     }
 
@@ -204,6 +203,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
         // Send any batched modifications. This is necessary to honor the read uncommitted semantics of the
         // public API contract.
 
+        acquireOperation();
         sendBatchedModifications();
 
         OnComplete<Object> onComplete = new OnComplete<Object>() {
@@ -246,6 +246,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
         // Send any batched modifications. This is necessary to honor the read uncommitted semantics of the
         // public API contract.
 
+        acquireOperation();
         sendBatchedModifications();
 
         OnComplete<Object> onComplete = new OnComplete<Object>() {
index 9cb062dc1c58775ca83d8aa879cbf3daf750e68a..984d650a32fcd52db6749e48aa820d7b47b6975e 100644 (file)
@@ -161,7 +161,7 @@ final class RemoteTransactionContextSupport {
         if(failure != null) {
             LOG.debug("Tx {} Creating NoOpTransaction because of error", getIdentifier(), failure);
 
-            localTransactionContext = new NoOpTransactionContext(failure, getIdentifier(), getOperationLimiter());
+            localTransactionContext = new NoOpTransactionContext(failure, getOperationLimiter());
         } else if (CreateTransactionReply.SERIALIZABLE_CLASS.equals(response.getClass())) {
             localTransactionContext = createValidTransactionContext(
                     CreateTransactionReply.fromSerializable(response));
@@ -169,7 +169,7 @@ final class RemoteTransactionContextSupport {
             IllegalArgumentException exception = new IllegalArgumentException(String.format(
                     "Invalid reply type %s for CreateTransaction", response.getClass()));
 
-            localTransactionContext = new NoOpTransactionContext(exception, getIdentifier(), getOperationLimiter());
+            localTransactionContext = new NoOpTransactionContext(exception, getOperationLimiter());
         }
 
         transactionContextAdapter.executePriorTransactionOperations(localTransactionContext);
@@ -190,10 +190,10 @@ final class RemoteTransactionContextSupport {
         final TransactionContext ret;
 
         if (remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) {
-            ret = new PreLithiumTransactionContextImpl(transactionPath, transactionActor, getIdentifier(),
+            ret = new PreLithiumTransactionContextImpl(transactionPath, transactionActor,
                 getActorContext(), isTxActorLocal, remoteTransactionVersion, parent.getLimiter());
         } else {
-            ret = new RemoteTransactionContext(transactionActor, getIdentifier(), getActorContext(),
+            ret = new RemoteTransactionContext(transactionActor, getActorContext(),
                 isTxActorLocal, remoteTransactionVersion, parent.getLimiter());
         }
 
index 4eea785964b1e8ef9b4023390547a813408863e4..e5130ed6dc78df0420bc811c4c18546df139db0b 100644 (file)
@@ -36,4 +36,14 @@ interface TransactionContext {
     boolean supportsDirectCommit();
 
     Future<Object> directCommit();
+
+    /**
+     * Invoked by {@link TransactionContextWrapper} when it has finished handing
+     * off operations to this context. From this point on, the context is responsible
+     * for throttling operations.
+     *
+     * Implementations can rely on the wrapper calling this operation in a synchronized
+     * block, so they do not need to ensure visibility of this state transition themselves.
+     */
+    void operationHandoffComplete();
 }
index 26d7ff8b02bf7beef1e8637fd513af26318777b4..b08d4192b48c1ddd7b3cf3162782b68ef129c855 100644 (file)
@@ -7,6 +7,9 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import akka.actor.ActorSelection;
+import akka.dispatch.Futures;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -15,6 +18,8 @@ import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+import scala.concurrent.Promise;
 
 /**
  * A helper class that wraps an eventual TransactionContext instance. Operations destined for the target
@@ -37,10 +42,10 @@ class TransactionContextWrapper {
      */
     private volatile TransactionContext transactionContext;
 
-    private final TransactionIdentifier identifier;
+    private final OperationLimiter limiter;
 
-    TransactionContextWrapper(final TransactionIdentifier identifier) {
-        this.identifier = identifier;
+    TransactionContextWrapper(final OperationLimiter limiter) {
+        this.limiter = Preconditions.checkNotNull(limiter);
     }
 
     TransactionContext getTransactionContext() {
@@ -48,7 +53,7 @@ class TransactionContextWrapper {
     }
 
     TransactionIdentifier getIdentifier() {
-        return identifier;
+        return limiter.getIdentifier();
     }
 
     /**
@@ -69,6 +74,8 @@ class TransactionContextWrapper {
 
         if (invokeOperation) {
             operation.invoke(transactionContext);
+        } else {
+            limiter.acquire();
         }
     }
 
@@ -95,10 +102,11 @@ class TransactionContextWrapper {
             // queued (eg a put operation from a client read Future callback that is notified
             // synchronously).
             Collection<TransactionOperation> operationsBatch = null;
-            synchronized(queuedTxOperations) {
-                if(queuedTxOperations.isEmpty()) {
+            synchronized (queuedTxOperations) {
+                if (queuedTxOperations.isEmpty()) {
                     // We're done invoking the TransactionOperations so we can now publish the
                     // TransactionContext.
+                    localTransactionContext.operationHandoffComplete();
                     transactionContext = localTransactionContext;
                     break;
                 }
@@ -110,9 +118,26 @@ class TransactionContextWrapper {
             // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking.
             // A slight down-side is that we need to re-acquire the lock below but this should
             // be negligible.
-            for(TransactionOperation oper: operationsBatch) {
+            for (TransactionOperation oper : operationsBatch) {
                 oper.invoke(localTransactionContext);
             }
         }
     }
+
+    Future<ActorSelection> readyTransaction() {
+        // avoid the creation of a promise and a TransactionOperation
+        if (transactionContext != null) {
+            return transactionContext.readyTransaction();
+        }
+
+        final Promise<ActorSelection> promise = Futures.promise();
+        enqueueTransactionOperation(new TransactionOperation() {
+            @Override
+            public void invoke(TransactionContext transactionContext) {
+                promise.completeWith(transactionContext.readyTransaction());
+            }
+        });
+
+        return promise.future();
+    }
 }
index 5aafcfc88f2eb72ccdef204d938731e8e08b5f63..1bda7810edb2d5c7ea43e570b3c49e51d1aa7deb 100644 (file)
@@ -78,8 +78,6 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
 
         LOG.debug("Tx {} exists {}", getIdentifier(), path);
 
-        limiter.acquire();
-
         final SettableFuture<Boolean> proxyFuture = SettableFuture.create();
         TransactionContextWrapper contextAdapter = getContextAdapter(path);
         contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
@@ -101,8 +99,6 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         if (YangInstanceIdentifier.EMPTY.equals(path)) {
             return readAllData();
         } else {
-            limiter.acquire();
-
             return singleShardRead(shardNameFromIdentifier(path), path);
         }
     }
@@ -152,8 +148,6 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
 
         LOG.debug("Tx {} delete {}", getIdentifier(), path);
 
-        limiter.acquire();
-
         TransactionContextWrapper contextAdapter = getContextAdapter(path);
         contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
             @Override
@@ -169,8 +163,6 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
 
         LOG.debug("Tx {} merge {}", getIdentifier(), path);
 
-        limiter.acquire();
-
         TransactionContextWrapper contextAdapter = getContextAdapter(path);
         contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
             @Override
@@ -186,8 +178,6 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
 
         LOG.debug("Tx {} write {}", getIdentifier(), path);
 
-        limiter.acquire();
-
         TransactionContextWrapper contextAdapter = getContextAdapter(path);
         contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
             @Override
@@ -265,7 +255,6 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
 
     private AbstractThreePhaseCommitCohort<?> createSingleCommitCohort(final String shardName,
             final TransactionContextWrapper contextAdapter) {
-        limiter.acquire();
 
         LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), shardName);
 
@@ -308,30 +297,11 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
     private AbstractThreePhaseCommitCohort<ActorSelection> createMultiCommitCohort(
             final Set<Entry<String, TransactionContextWrapper>> txContextAdapterEntries) {
 
-        limiter.acquire();
         final List<Future<ActorSelection>> cohortFutures = new ArrayList<>(txContextAdapterEntries.size());
         for (Entry<String, TransactionContextWrapper> e : txContextAdapterEntries) {
             LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), e.getKey());
 
-            TransactionContextWrapper contextAdapter = e.getValue();
-            final TransactionContext transactionContext = contextAdapter.getTransactionContext();
-            Future<ActorSelection> future;
-            if (transactionContext != null) {
-                // avoid the creation of a promise and a TransactionOperation
-                future = transactionContext.readyTransaction();
-            } else {
-                final Promise<ActorSelection> promise = akka.dispatch.Futures.promise();
-                contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
-                    @Override
-                    public void invoke(TransactionContext transactionContext) {
-                        promise.completeWith(transactionContext.readyTransaction());
-                    }
-                });
-
-                future = promise.future();
-            }
-
-            cohortFutures.add(future);
+            cohortFutures.add(e.getValue().readyTransaction());
         }
 
         return new ThreePhaseCommitCohortProxy(txContextFactory.getActorContext(), cohortFutures, getIdentifier().toString());
index 4de8ab721f7b59e5b138df3acf2ac78f7c49bb9f..733543aabdc262a7c3d5692a2629e6bc2b05982d 100644 (file)
@@ -11,7 +11,6 @@ import akka.actor.ActorSelection;
 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.cluster.datastore.OperationLimiter;
 import org.opendaylight.controller.cluster.datastore.RemoteTransactionContext;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
@@ -35,10 +34,10 @@ public class PreLithiumTransactionContextImpl extends RemoteTransactionContext {
 
     private final String transactionPath;
 
-    public PreLithiumTransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier,
+    public PreLithiumTransactionContextImpl(String transactionPath, ActorSelection actor,
             ActorContext actorContext, boolean isTxActorLocal,
             short remoteTransactionVersion, OperationLimiter limiter) {
-        super(actor, identifier, actorContext, isTxActorLocal, remoteTransactionVersion, limiter);
+        super(actor, actorContext, isTxActorLocal, remoteTransactionVersion, limiter);
         this.transactionPath = transactionPath;
     }
 
index d8f74dd83222fa1e5e48486b85123548e9fcc087..b04612ec85d9555a1feb1153a3533d73516d610e 100644 (file)
@@ -3,16 +3,19 @@ package org.opendaylight.controller.cluster.datastore;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
+import akka.dispatch.ExecutionContexts;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -31,9 +34,9 @@ public class LocalTransactionContextTest {
     LocalTransactionContext localTransactionContext;
 
     @Before
-    public void setUp(){
+    public void setUp() {
         MockitoAnnotations.initMocks(this);
-        localTransactionContext = new LocalTransactionContext(identifier, readWriteTransaction, limiter) {
+        localTransactionContext = new LocalTransactionContext(readWriteTransaction, limiter) {
             @Override
             protected DOMStoreWriteTransaction getWriteDelegate() {
                 return readWriteTransaction;
@@ -47,7 +50,7 @@ public class LocalTransactionContextTest {
     }
 
     @Test
-    public void testWrite(){
+    public void testWrite() {
         YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
         NormalizedNode<?, ?> normalizedNode = mock(NormalizedNode.class);
         localTransactionContext.writeData(yangInstanceIdentifier, normalizedNode);
@@ -56,7 +59,7 @@ public class LocalTransactionContextTest {
     }
 
     @Test
-    public void testMerge(){
+    public void testMerge() {
         YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
         NormalizedNode<?, ?> normalizedNode = mock(NormalizedNode.class);
         localTransactionContext.mergeData(yangInstanceIdentifier, normalizedNode);
@@ -65,7 +68,7 @@ public class LocalTransactionContextTest {
     }
 
     @Test
-    public void testDelete(){
+    public void testDelete() {
         YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
         localTransactionContext.deleteData(yangInstanceIdentifier);
         verify(limiter).release();
@@ -74,7 +77,7 @@ public class LocalTransactionContextTest {
 
 
     @Test
-    public void testRead(){
+    public void testRead() {
         YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
         NormalizedNode<?, ?> normalizedNode = mock(NormalizedNode.class);
         doReturn(Futures.immediateCheckedFuture(Optional.of(normalizedNode))).when(readWriteTransaction).read(yangInstanceIdentifier);
@@ -84,7 +87,7 @@ public class LocalTransactionContextTest {
     }
 
     @Test
-    public void testExists(){
+    public void testExists() {
         YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
         doReturn(Futures.immediateCheckedFuture(true)).when(readWriteTransaction).exists(yangInstanceIdentifier);
         localTransactionContext.dataExists(yangInstanceIdentifier, SettableFuture.<Boolean> create());
@@ -93,10 +96,15 @@ public class LocalTransactionContextTest {
     }
 
     @Test
-    public void testReady(){
-        doReturn(mock(LocalThreePhaseCommitCohort.class)).when(readWriteTransaction).ready();
+    public void testReady() {
+        final LocalThreePhaseCommitCohort mockCohort = mock(LocalThreePhaseCommitCohort.class);
+        final ActorContext mockContext = mock(ActorContext.class);
+        doReturn(mockContext).when(mockCohort).getActorContext();
+        doReturn(ExecutionContexts.fromExecutor(MoreExecutors.directExecutor())).when(mockContext).getClientDispatcher();
+        doReturn(akka.dispatch.Futures.successful(null)).when(mockCohort).initiateCoordinatedCommit();
+        doReturn(mockCohort).when(readWriteTransaction).ready();
         localTransactionContext.readyTransaction();
-        verify(limiter).release();
+        verify(limiter).onComplete(null, null);
         verify(readWriteTransaction).ready();
     }