Improve LocalProxyTransaction.doExists()
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / AbstractClientHandle.java
index 90971b31cece3b7718e830b6a59014405d0d8a7b..d10627dcf93e065df03c5f5130da50303d9ecc79 100644 (file)
@@ -12,9 +12,10 @@ import static java.util.Objects.requireNonNull;
 
 import com.google.common.annotations.Beta;
 import com.google.common.base.MoreObjects;
-import java.util.Collection;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.stream.Stream;
 import org.eclipse.jdt.annotation.NonNull;
 import org.eclipse.jdt.annotation.Nullable;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
@@ -55,6 +56,7 @@ public abstract class AbstractClientHandle<T extends AbstractProxyTransaction> e
     }
 
     @Override
+    // Non-final for mocking
     public TransactionIdentifier getIdentifier() {
         return transactionId;
     }
@@ -64,6 +66,7 @@ public abstract class AbstractClientHandle<T extends AbstractProxyTransaction> e
      *
      * @return True if this transaction became closed during this call
      */
+    // Non-final for mocking
     public boolean abort() {
         if (commonAbort()) {
             parent.onTransactionAbort(this);
@@ -74,12 +77,13 @@ public abstract class AbstractClientHandle<T extends AbstractProxyTransaction> e
     }
 
     private boolean commonAbort() {
-        final Collection<T> toClose = ensureClosed();
+        final Map<Long, T> toClose = ensureClosed();
         if (toClose == null) {
             return false;
         }
 
-        toClose.forEach(AbstractProxyTransaction::abort);
+        toClose.values().forEach(AbstractProxyTransaction::abort);
+        parent.onTransactionShardsBound(transactionId, toClose.keySet());
         return true;
     }
 
@@ -93,21 +97,27 @@ public abstract class AbstractClientHandle<T extends AbstractProxyTransaction> e
      * Make sure this snapshot is closed. If it became closed as the effect of this call, return a collection of
      * {@link AbstractProxyTransaction} handles which need to be closed, too.
      *
-     * @return null if this snapshot has already been closed, otherwise a collection of proxies, which need to be
+     * @return null if this snapshot has already been closed, otherwise a State with of proxies, which need to be
      *         closed, too.
      */
-    final @Nullable Collection<T> ensureClosed() {
+    final @Nullable Map<Long, T> ensureClosed() {
         // volatile read and a conditional CAS. This ends up being better in the typical case when we are invoked more
         // than once (see ClientBackedTransaction) than performing a STATE_UPDATER.getAndSet().
         final State<T> local = state;
-        return local != null && STATE_UPDATER.compareAndSet(this, local, null) ? local.values() : null;
+        return local != null && STATE_UPDATER.compareAndSet(this, local, null) ? local : null;
     }
 
     final T ensureProxy(final YangInstanceIdentifier path) {
-        final State<T> local = getState();
-        final Long shard = parent.resolveShardForPath(path);
+        return ensureProxy(getState(), parent.resolveShardForPath(path));
+    }
+
+    private T ensureProxy(final State<T> localState, final Long shard) {
+        return localState.computeIfAbsent(shard, this::createProxy);
+    }
 
-        return local.computeIfAbsent(shard, this::createProxy);
+    final Stream<T> ensureAllProxies() {
+        final var local = getState();
+        return parent.resolveAllShards().map(shard -> ensureProxy(local, shard));
     }
 
     final AbstractClientHistory parent() {