Convert OperationCompleter to OperationLimiter 41/22141/1
authorRobert Varga <rovarga@cisco.com>
Tue, 19 May 2015 19:50:12 +0000 (21:50 +0200)
committerMoiz Raja <moraja@cisco.com>
Tue, 9 Jun 2015 02:32:06 +0000 (02:32 +0000)
The completer and limiter functions are related to each other, so
encapsulate them in a single object.

Since a TransactionProxy cannot really do anything without touching the
limiter, make sure we instantiate it in constructor, preventing some
volatile reads in the fast path.

Change-Id: I4cf31ef46c11676611a62db7a794f504712ce5af
Signed-off-by: Robert Varga <rovarga@cisco.com>
(cherry picked from commit b2af021ee27b2977961f0fec6f8bb1a4acbcdbd7)

13 files changed:
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpTransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/OperationCompleter.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/OperationLimiter.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionContextImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContextTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/OperationCompleterTest.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/OperationLimiterTest.java [new file with mode: 0644]

index b50426a811e3f2fd677f187ba459a2c09653c34c..c90a3f6f6f70edb1f937efc701fad31086114959 100644 (file)
@@ -178,7 +178,7 @@ abstract class AbstractTransactionContextFactory<F extends LocalTransactionFacto
         switch(parent.getType()) {
             case READ_ONLY:
                 final DOMStoreReadTransaction readOnly = factory.newReadOnlyTransaction(parent.getIdentifier());
-                return new LocalTransactionContext(parent.getIdentifier(), readOnly, parent.getCompleter()) {
+                return new LocalTransactionContext(parent.getIdentifier(), readOnly, parent.getLimiter()) {
                     @Override
                     protected DOMStoreWriteTransaction getWriteDelegate() {
                         throw new UnsupportedOperationException();
@@ -191,7 +191,7 @@ abstract class AbstractTransactionContextFactory<F extends LocalTransactionFacto
                 };
             case READ_WRITE:
                 final DOMStoreReadWriteTransaction readWrite = factory.newReadWriteTransaction(parent.getIdentifier());
-                return new LocalTransactionContext(parent.getIdentifier(), readWrite, parent.getCompleter()) {
+                return new LocalTransactionContext(parent.getIdentifier(), readWrite, parent.getLimiter()) {
                     @Override
                     protected DOMStoreWriteTransaction getWriteDelegate() {
                         return readWrite;
@@ -204,7 +204,7 @@ abstract class AbstractTransactionContextFactory<F extends LocalTransactionFacto
                 };
             case WRITE_ONLY:
                 final DOMStoreWriteTransaction writeOnly = factory.newWriteOnlyTransaction(parent.getIdentifier());
-                return new LocalTransactionContext(parent.getIdentifier(), writeOnly, parent.getCompleter()) {
+                return new LocalTransactionContext(parent.getIdentifier(), writeOnly, parent.getLimiter()) {
                     @Override
                     protected DOMStoreWriteTransaction getWriteDelegate() {
                         return writeOnly;
index 3648b97eb68c11ff97e0a5c3897ed649430fd2db..dd7d899e0cb10f809cc6a1b67fddc605896f1875 100644 (file)
@@ -30,12 +30,12 @@ import scala.concurrent.Future;
 abstract class LocalTransactionContext extends AbstractTransactionContext {
 
     private final DOMStoreTransaction txDelegate;
-    private final OperationCompleter completer;
+    private final OperationLimiter limiter;
 
-    LocalTransactionContext(TransactionIdentifier identifier, DOMStoreTransaction txDelegate, OperationCompleter completer) {
+    LocalTransactionContext(TransactionIdentifier identifier, DOMStoreTransaction txDelegate, OperationLimiter limiter) {
         super(identifier);
         this.txDelegate = Preconditions.checkNotNull(txDelegate);
-        this.completer = Preconditions.checkNotNull(completer);
+        this.limiter = Preconditions.checkNotNull(limiter);
     }
 
     protected abstract DOMStoreWriteTransaction getWriteDelegate();
@@ -46,21 +46,21 @@ abstract class LocalTransactionContext extends AbstractTransactionContext {
     public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
         incrementModificationCount();
         getWriteDelegate().write(path, data);
-        completer.onComplete(null, null);
+        limiter.release();
     }
 
     @Override
     public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
         incrementModificationCount();
         getWriteDelegate().merge(path, data);
-        completer.onComplete(null, null);
+        limiter.release();
     }
 
     @Override
     public void deleteData(YangInstanceIdentifier path) {
         incrementModificationCount();
         getWriteDelegate().delete(path);
-        completer.onComplete(null, null);
+        limiter.release();
     }
 
     @Override
@@ -69,13 +69,13 @@ abstract class LocalTransactionContext extends AbstractTransactionContext {
             @Override
             public void onSuccess(Optional<NormalizedNode<?, ?>> result) {
                 proxyFuture.set(result);
-                completer.onComplete(null, null);
+                limiter.release();
             }
 
             @Override
             public void onFailure(Throwable t) {
                 proxyFuture.setException(t);
-                completer.onComplete(null, null);
+                limiter.release();
             }
         });
     }
@@ -86,13 +86,13 @@ abstract class LocalTransactionContext extends AbstractTransactionContext {
             @Override
             public void onSuccess(Boolean result) {
                 proxyFuture.set(result);
-                completer.onComplete(null, null);
+                limiter.release();
             }
 
             @Override
             public void onFailure(Throwable t) {
                 proxyFuture.setException(t);
-                completer.onComplete(null, null);
+                limiter.release();
             }
         });
     }
@@ -100,7 +100,7 @@ abstract class LocalTransactionContext extends AbstractTransactionContext {
     private LocalThreePhaseCommitCohort ready() {
         logModificationCount();
         LocalThreePhaseCommitCohort ready = (LocalThreePhaseCommitCohort) getWriteDelegate().ready();
-        completer.onComplete(null, null);
+        limiter.release();
         return ready;
     }
 
index 450716800066a291d00e2b22b71a4c6306dfadaf..ff485cbab1046d4f96f15559ab10880638ea816b 100644 (file)
@@ -10,8 +10,6 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorSelection;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.SettableFuture;
-import java.util.concurrent.Semaphore;
-
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.md.sal.common.api.data.DataStoreUnavailableException;
@@ -26,9 +24,9 @@ final class NoOpTransactionContext extends AbstractTransactionContext {
     private static final Logger LOG = LoggerFactory.getLogger(NoOpTransactionContext.class);
 
     private final Throwable failure;
-    private final Semaphore operationLimiter;
+    private final OperationLimiter operationLimiter;
 
-    public NoOpTransactionContext(Throwable failure, TransactionIdentifier identifier, Semaphore operationLimiter) {
+    public NoOpTransactionContext(Throwable failure, TransactionIdentifier identifier, OperationLimiter operationLimiter) {
         super(identifier);
         this.failure = failure;
         this.operationLimiter = operationLimiter;
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/OperationCompleter.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/OperationCompleter.java
deleted file mode 100644 (file)
index ec867dd..0000000
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import akka.dispatch.OnComplete;
-import com.google.common.base.Preconditions;
-import java.util.concurrent.Semaphore;
-import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
-
-public final class OperationCompleter extends OnComplete<Object> {
-    private final Semaphore operationLimiter;
-
-    OperationCompleter(Semaphore operationLimiter){
-        this.operationLimiter = Preconditions.checkNotNull(operationLimiter);
-    }
-
-    @Override
-    public void onComplete(Throwable throwable, Object message) {
-        if(message instanceof BatchedModificationsReply) {
-            this.operationLimiter.release(((BatchedModificationsReply)message).getNumBatched());
-        } else {
-            this.operationLimiter.release();
-        }
-    }
-}
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/OperationLimiter.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/OperationLimiter.java
new file mode 100644 (file)
index 0000000..b422309
--- /dev/null
@@ -0,0 +1,79 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.dispatch.OnComplete;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class for limiting operations. It extends {@link OnComplete}, so we can plug it seamlessly
+ * into akka to release permits as futures complete.
+ */
+public class OperationLimiter extends OnComplete<Object> {
+    private static final Logger LOG = LoggerFactory.getLogger(OperationLimiter.class);
+    private final TransactionIdentifier identifier;
+    private final long acquireTimeout;
+    private final Semaphore semaphore;
+
+    OperationLimiter(final TransactionIdentifier identifier, final int maxPermits, final int acquireTimeoutSeconds) {
+        this.identifier = Preconditions.checkNotNull(identifier);
+
+        Preconditions.checkArgument(acquireTimeoutSeconds >= 0);
+        this.acquireTimeout = TimeUnit.SECONDS.toNanos(acquireTimeoutSeconds);
+
+        Preconditions.checkArgument(maxPermits >= 0);
+        this.semaphore = new Semaphore(maxPermits);
+    }
+
+    void acquire() {
+        acquire(1);
+    }
+
+    private void acquire(final int acquirePermits) {
+        try {
+            if (!semaphore.tryAcquire(acquirePermits, acquireTimeout, TimeUnit.NANOSECONDS)) {
+                LOG.warn("Failed to acquire operation permit for transaction {}", identifier);
+            }
+        } catch (InterruptedException e) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Interrupted when trying to acquire operation permit for transaction {}", identifier, e);
+            } else {
+                LOG.warn("Interrupted when trying to acquire operation permit for transaction {}", identifier);
+            }
+        }
+    }
+
+    void release() {
+        this.semaphore.release();
+    }
+
+    @Override
+    public void onComplete(final Throwable throwable, final Object message) {
+        if (message instanceof BatchedModificationsReply) {
+            this.semaphore.release(((BatchedModificationsReply)message).getNumBatched());
+        } else {
+            this.semaphore.release();
+        }
+    }
+
+    public TransactionIdentifier getIdentifier() {
+        return identifier;
+    }
+
+    @VisibleForTesting
+    Semaphore getSemaphore() {
+        return semaphore;
+    }
+}
index 7429ca01dfa7f416385046a099755625e9396df1..6bf0f7fc9c3e768697371bc0b216b5203c2cefaf 100644 (file)
@@ -46,19 +46,19 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
     private final boolean isTxActorLocal;
     private final short remoteTransactionVersion;
 
-    private final OperationCompleter operationCompleter;
+    private final OperationLimiter operationCompleter;
     private BatchedModifications batchedModifications;
     private int totalBatchedModificationsSent;
 
     protected RemoteTransactionContext(ActorSelection actor, TransactionIdentifier identifier,
             ActorContext actorContext, boolean isTxActorLocal,
-            short remoteTransactionVersion, OperationCompleter operationCompleter) {
+            short remoteTransactionVersion, OperationLimiter limiter) {
         super(identifier);
         this.actor = actor;
         this.actorContext = actorContext;
         this.isTxActorLocal = isTxActorLocal;
         this.remoteTransactionVersion = remoteTransactionVersion;
-        this.operationCompleter = operationCompleter;
+        this.operationCompleter = limiter;
     }
 
     private Future<Object> completeOperation(Future<Object> operationFuture){
index 71d9dabf37afd6c5390d6231a4151aba21722dea..9cb062dc1c58775ca83d8aa879cbf3daf750e68a 100644 (file)
@@ -11,7 +11,6 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorSelection;
 import akka.dispatch.OnComplete;
 import com.google.common.base.Preconditions;
-import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.datastore.compat.PreLithiumTransactionContextImpl;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
@@ -75,7 +74,7 @@ final class RemoteTransactionContextSupport {
         return parent.getActorContext();
     }
 
-    private Semaphore getOperationLimiter() {
+    private OperationLimiter getOperationLimiter() {
         return parent.getLimiter();
     }
 
@@ -192,10 +191,10 @@ final class RemoteTransactionContextSupport {
 
         if (remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) {
             ret = new PreLithiumTransactionContextImpl(transactionPath, transactionActor, getIdentifier(),
-                getActorContext(), isTxActorLocal, remoteTransactionVersion, parent.getCompleter());
+                getActorContext(), isTxActorLocal, remoteTransactionVersion, parent.getLimiter());
         } else {
             ret = new RemoteTransactionContext(transactionActor, getIdentifier(), getActorContext(),
-                isTxActorLocal, remoteTransactionVersion, parent.getCompleter());
+                isTxActorLocal, remoteTransactionVersion, parent.getLimiter());
         }
 
         if(parent.getType() == TransactionType.READ_ONLY) {
index ea4e5139425f5eee21151a0cc759910d77b8f6cc..26d7ff8b02bf7beef1e8637fd513af26318777b4 100644 (file)
@@ -56,7 +56,7 @@ class TransactionContextWrapper {
      */
     private void enqueueTransactionOperation(final TransactionOperation operation) {
         final boolean invokeOperation;
-        synchronized(queuedTxOperations) {
+        synchronized (queuedTxOperations) {
             if (transactionContext == null) {
                 LOG.debug("Tx {} Queuing TransactionOperation", getIdentifier());
 
index e7a00042e4146c5ddc03a013e6eeda3198a38ea3..5aafcfc88f2eb72ccdef204d938731e8e08b5f63 100644 (file)
@@ -24,8 +24,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
@@ -55,10 +53,9 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
 
     private final Map<String, TransactionContextWrapper> txContextAdapters = new HashMap<>();
     private final AbstractTransactionContextFactory<?> txContextFactory;
+    private final OperationLimiter limiter;
     private final TransactionType type;
     private TransactionState state = TransactionState.OPEN;
-    private volatile OperationCompleter operationCompleter;
-    private volatile Semaphore operationLimiter;
 
     @VisibleForTesting
     public TransactionProxy(final AbstractTransactionContextFactory<?> txContextFactory, final TransactionType type) {
@@ -67,6 +64,11 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         this.txContextFactory = txContextFactory;
         this.type = Preconditions.checkNotNull(type);
 
+        // Note : Currently mailbox-capacity comes from akka.conf and not from the config-subsystem
+        this.limiter = new OperationLimiter(getIdentifier(),
+            getActorContext().getTransactionOutstandingOperationLimit(),
+            getActorContext().getDatastoreContext().getOperationTimeoutInSeconds());
+
         LOG.debug("New {} Tx - {}", type, getIdentifier());
     }
 
@@ -76,7 +78,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
 
         LOG.debug("Tx {} exists {}", getIdentifier(), path);
 
-        throttleOperation();
+        limiter.acquire();
 
         final SettableFuture<Boolean> proxyFuture = SettableFuture.create();
         TransactionContextWrapper contextAdapter = getContextAdapter(path);
@@ -99,7 +101,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         if (YangInstanceIdentifier.EMPTY.equals(path)) {
             return readAllData();
         } else {
-            throttleOperation();
+            limiter.acquire();
 
             return singleShardRead(shardNameFromIdentifier(path), path);
         }
@@ -150,7 +152,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
 
         LOG.debug("Tx {} delete {}", getIdentifier(), path);
 
-        throttleOperation();
+        limiter.acquire();
 
         TransactionContextWrapper contextAdapter = getContextAdapter(path);
         contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
@@ -167,7 +169,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
 
         LOG.debug("Tx {} merge {}", getIdentifier(), path);
 
-        throttleOperation();
+        limiter.acquire();
 
         TransactionContextWrapper contextAdapter = getContextAdapter(path);
         contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
@@ -184,7 +186,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
 
         LOG.debug("Tx {} write {}", getIdentifier(), path);
 
-        throttleOperation();
+        limiter.acquire();
 
         TransactionContextWrapper contextAdapter = getContextAdapter(path);
         contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
@@ -263,7 +265,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
 
     private AbstractThreePhaseCommitCohort<?> createSingleCommitCohort(final String shardName,
             final TransactionContextWrapper contextAdapter) {
-        throttleOperation();
+        limiter.acquire();
 
         LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), shardName);
 
@@ -306,7 +308,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
     private AbstractThreePhaseCommitCohort<ActorSelection> createMultiCommitCohort(
             final Set<Entry<String, TransactionContextWrapper>> txContextAdapterEntries) {
 
-        throttleOperation();
+        limiter.acquire();
         final List<Future<ActorSelection>> cohortFutures = new ArrayList<>(txContextAdapterEntries.size());
         for (Entry<String, TransactionContextWrapper> e : txContextAdapterEntries) {
             LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), e.getKey());
@@ -366,43 +368,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         return txContextFactory.getActorContext();
     }
 
-    OperationCompleter getCompleter() {
-        OperationCompleter ret = operationCompleter;
-        if (ret == null) {
-            final Semaphore s = getLimiter();
-            ret = new OperationCompleter(s);
-            operationCompleter = ret;
-        }
-
-        return ret;
-    }
-
-    Semaphore getLimiter() {
-        Semaphore ret = operationLimiter;
-        if (ret == null) {
-            // Note : Currently mailbox-capacity comes from akka.conf and not from the config-subsystem
-            ret = new Semaphore(getActorContext().getTransactionOutstandingOperationLimit());
-            operationLimiter = ret;
-        }
-        return ret;
-    }
-
-    void throttleOperation() {
-        throttleOperation(1);
-    }
-
-    private void throttleOperation(int acquirePermits) {
-        try {
-            if (!getLimiter().tryAcquire(acquirePermits,
-                getActorContext().getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){
-                LOG.warn("Failed to acquire operation permit for transaction {}", getIdentifier());
-            }
-        } catch (InterruptedException e) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Interrupted when trying to acquire operation permit for transaction {}", getIdentifier(), e);
-            } else {
-                LOG.warn("Interrupted when trying to acquire operation permit for transaction {}", getIdentifier());
-            }
-        }
+    OperationLimiter getLimiter() {
+        return limiter;
     }
 }
index dc82565bc0ed9789cdb5604e4a6b8f59947d48f9..4de8ab721f7b59e5b138df3acf2ac78f7c49bb9f 100644 (file)
@@ -9,7 +9,7 @@ package org.opendaylight.controller.cluster.datastore.compat;
 
 import akka.actor.ActorSelection;
 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
-import org.opendaylight.controller.cluster.datastore.OperationCompleter;
+import org.opendaylight.controller.cluster.datastore.OperationLimiter;
 import org.opendaylight.controller.cluster.datastore.RemoteTransactionContext;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
@@ -37,8 +37,8 @@ public class PreLithiumTransactionContextImpl extends RemoteTransactionContext {
 
     public PreLithiumTransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier,
             ActorContext actorContext, boolean isTxActorLocal,
-            short remoteTransactionVersion, OperationCompleter operationCompleter) {
-        super(actor, identifier, actorContext, isTxActorLocal, remoteTransactionVersion, operationCompleter);
+            short remoteTransactionVersion, OperationLimiter limiter) {
+        super(actor, identifier, actorContext, isTxActorLocal, remoteTransactionVersion, limiter);
         this.transactionPath = transactionPath;
     }
 
index 7593eeadd383144f0c02001004f61608562a839f..d8f74dd83222fa1e5e48486b85123548e9fcc087 100644 (file)
@@ -6,7 +6,6 @@ import static org.mockito.Mockito.verify;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.SettableFuture;
-import java.util.concurrent.Semaphore;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mock;
@@ -21,7 +20,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 public class LocalTransactionContextTest {
 
     @Mock
-    Semaphore limiter;
+    OperationLimiter limiter;
 
     @Mock
     TransactionIdentifier identifier;
@@ -34,7 +33,7 @@ public class LocalTransactionContextTest {
     @Before
     public void setUp(){
         MockitoAnnotations.initMocks(this);
-        localTransactionContext = new LocalTransactionContext(identifier, readWriteTransaction, new OperationCompleter(limiter)) {
+        localTransactionContext = new LocalTransactionContext(identifier, readWriteTransaction, limiter) {
             @Override
             protected DOMStoreWriteTransaction getWriteDelegate() {
                 return readWriteTransaction;
@@ -50,7 +49,7 @@ public class LocalTransactionContextTest {
     @Test
     public void testWrite(){
         YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
-        NormalizedNode normalizedNode = mock(NormalizedNode.class);
+        NormalizedNode<?, ?> normalizedNode = mock(NormalizedNode.class);
         localTransactionContext.writeData(yangInstanceIdentifier, normalizedNode);
         verify(limiter).release();
         verify(readWriteTransaction).write(yangInstanceIdentifier, normalizedNode);
@@ -59,7 +58,7 @@ public class LocalTransactionContextTest {
     @Test
     public void testMerge(){
         YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
-        NormalizedNode normalizedNode = mock(NormalizedNode.class);
+        NormalizedNode<?, ?> normalizedNode = mock(NormalizedNode.class);
         localTransactionContext.mergeData(yangInstanceIdentifier, normalizedNode);
         verify(limiter).release();
         verify(readWriteTransaction).merge(yangInstanceIdentifier, normalizedNode);
@@ -77,7 +76,7 @@ public class LocalTransactionContextTest {
     @Test
     public void testRead(){
         YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
-        NormalizedNode normalizedNode = mock(NormalizedNode.class);
+        NormalizedNode<?, ?> normalizedNode = mock(NormalizedNode.class);
         doReturn(Futures.immediateCheckedFuture(Optional.of(normalizedNode))).when(readWriteTransaction).read(yangInstanceIdentifier);
         localTransactionContext.readData(yangInstanceIdentifier, SettableFuture.<Optional<NormalizedNode<?,?>>>create());
         verify(limiter).release();
@@ -102,4 +101,4 @@ public class LocalTransactionContextTest {
     }
 
 
-}
\ No newline at end of file
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/OperationCompleterTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/OperationCompleterTest.java
deleted file mode 100644 (file)
index 00900d3..0000000
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static org.junit.Assert.assertEquals;
-import java.util.concurrent.Semaphore;
-import org.junit.Test;
-import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
-import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
-
-/**
- * Unit tests for OperationCompleter.
- *
- * @author Thomas Pantelis
- */
-public class OperationCompleterTest {
-
-    @Test
-    public void testOnComplete() throws Exception {
-        int permits = 10;
-        Semaphore operationLimiter = new Semaphore(permits);
-        operationLimiter.acquire(permits);
-        int availablePermits = 0;
-
-        OperationCompleter completer = new OperationCompleter(operationLimiter );
-
-        completer.onComplete(null, DataExistsReply.create(true));
-        assertEquals("availablePermits", ++availablePermits, operationLimiter.availablePermits());
-
-        completer.onComplete(null, DataExistsReply.create(true));
-        assertEquals("availablePermits", ++availablePermits, operationLimiter.availablePermits());
-
-        completer.onComplete(null, new IllegalArgumentException());
-        assertEquals("availablePermits", ++availablePermits, operationLimiter.availablePermits());
-
-        completer.onComplete(null, new BatchedModificationsReply(4));
-        availablePermits += 4;
-        assertEquals("availablePermits", availablePermits, operationLimiter.availablePermits());
-    }
-}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/OperationLimiterTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/OperationLimiterTest.java
new file mode 100644 (file)
index 0000000..40ce84b
--- /dev/null
@@ -0,0 +1,45 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.junit.Assert.assertEquals;
+import java.util.concurrent.Semaphore;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
+import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
+
+/**
+ * Unit tests for OperationCompleter.
+ *
+ * @author Thomas Pantelis
+ */
+public class OperationLimiterTest {
+
+    @Test
+    public void testOnComplete() throws Exception {
+        int permits = 10;
+        OperationLimiter limiter = new OperationLimiter(new TransactionIdentifier("foo", 1), permits, 1);
+        Semaphore semaphore = limiter.getSemaphore();
+        semaphore.acquire(permits);
+        int availablePermits = 0;
+
+        limiter.onComplete(null, DataExistsReply.create(true));
+        assertEquals("availablePermits", ++availablePermits, semaphore.availablePermits());
+
+        limiter.onComplete(null, DataExistsReply.create(true));
+        assertEquals("availablePermits", ++availablePermits, semaphore.availablePermits());
+
+        limiter.onComplete(null, new IllegalArgumentException());
+        assertEquals("availablePermits", ++availablePermits, semaphore.availablePermits());
+
+        limiter.onComplete(null, new BatchedModificationsReply(4));
+        availablePermits += 4;
+        assertEquals("availablePermits", availablePermits, semaphore.availablePermits());
+    }
+}