Improve LocalProxyTransaction.doExists()
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / AbstractDataStoreClientBehavior.java
index 3dc4dbf1469d989c05800bce57d3a175bd2e76cd..7187f83a1ac060d41341c110181877b9535b2985 100644 (file)
@@ -16,11 +16,15 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
-import org.opendaylight.controller.cluster.access.client.BackendInfoResolver;
+import java.util.concurrent.locks.StampedLock;
+import java.util.stream.Stream;
 import org.opendaylight.controller.cluster.access.client.ClientActorBehavior;
 import org.opendaylight.controller.cluster.access.client.ClientActorContext;
 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
+import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
+import org.opendaylight.controller.cluster.access.client.ReconnectForwarder;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -62,12 +66,13 @@ abstract class AbstractDataStoreClientBehavior extends ClientActorBehavior<Shard
 
     private final Map<LocalHistoryIdentifier, ClientLocalHistory> histories = new ConcurrentHashMap<>();
     private final AtomicLong nextHistoryId = new AtomicLong(1);
+    private final StampedLock lock = new StampedLock();
     private final SingleClientHistory singleHistory;
 
     private volatile Throwable aborted;
 
     AbstractDataStoreClientBehavior(final ClientActorContext context,
-            final BackendInfoResolver<ShardBackendInfo> resolver) {
+            final AbstractShardBackendResolver resolver) {
         super(context, resolver);
         singleHistory = new SingleClientHistory(this, new LocalHistoryIdentifier(getIdentifier(), 0));
     }
@@ -89,14 +94,19 @@ abstract class AbstractDataStoreClientBehavior extends ClientActorBehavior<Shard
     }
 
     private void abortOperations(final Throwable cause) {
-        // This acts as a barrier, application threads check this after they have added an entry in the maps,
-        // and if they observe aborted being non-null, they will perform their cleanup and not return the handle.
-        aborted = cause;
-
-        for (ClientLocalHistory h : histories.values()) {
-            h.localAbort(cause);
+        final long stamp = lock.writeLock();
+        try {
+            // This acts as a barrier, application threads check this after they have added an entry in the maps,
+            // and if they observe aborted being non-null, they will perform their cleanup and not return the handle.
+            aborted = cause;
+
+            for (ClientLocalHistory h : histories.values()) {
+                h.localAbort(cause);
+            }
+            histories.clear();
+        } finally {
+            lock.unlockWrite(stamp);
         }
-        histories.clear();
     }
 
     private AbstractDataStoreClientBehavior shutdown(final ClientActorBehavior<ShardBackendInfo> currentBehavior) {
@@ -121,6 +131,8 @@ abstract class AbstractDataStoreClientBehavior extends ClientActorBehavior<Shard
      */
     @Override
     protected final ConnectionConnectCohort connectionUp(final ConnectedClientConnection<ShardBackendInfo> newConn) {
+        final long stamp = lock.writeLock();
+
         // Step 1: Freeze all AbstractProxyHistory instances pointing to that shard. This indirectly means that no
         //         further TransactionProxies can be created and we can safely traverse maps without risking
         //         missing an entry
@@ -130,25 +142,33 @@ abstract class AbstractDataStoreClientBehavior extends ClientActorBehavior<Shard
             startReconnect(h, newConn, cohorts);
         }
 
-        return previousEntries -> {
-            try {
-                // Step 2: Collect previous successful requests from the cohorts. We do not want to expose
-                //         the non-throttling interface to the connection, hence we use a wrapper consumer
-                for (HistoryReconnectCohort c : cohorts) {
-                    c.replaySuccessfulRequests(previousEntries);
-                }
+        return previousEntries -> finishReconnect(newConn, stamp, cohorts, previousEntries);
+    }
 
-                // Step 3: Install a forwarder, which will forward requests back to affected cohorts. Any outstanding
-                //         requests will be immediately sent to it and requests being sent concurrently will get
-                //         forwarded once they hit the new connection.
-                return BouncingReconnectForwarder.forCohorts(newConn, cohorts);
-            } finally {
+    private ReconnectForwarder finishReconnect(final ConnectedClientConnection<ShardBackendInfo> newConn,
+            final long stamp, final Collection<HistoryReconnectCohort> cohorts,
+            final Collection<ConnectionEntry> previousEntries) {
+        try {
+            // Step 2: Collect previous successful requests from the cohorts. We do not want to expose
+            //         the non-throttling interface to the connection, hence we use a wrapper consumer
+            for (HistoryReconnectCohort c : cohorts) {
+                c.replayRequests(previousEntries);
+            }
+
+            // Step 3: Install a forwarder, which will forward requests back to affected cohorts. Any outstanding
+            //         requests will be immediately sent to it and requests being sent concurrently will get
+            //         forwarded once they hit the new connection.
+            return BouncingReconnectForwarder.forCohorts(newConn, cohorts);
+        } finally {
+            try {
                 // Step 4: Complete switchover of the connection. The cohorts can resume normal operations.
                 for (HistoryReconnectCohort c : cohorts) {
                     c.close();
                 }
+            } finally {
+                lock.unlockWrite(stamp);
             }
-        };
+        }
     }
 
     private static void startReconnect(final AbstractClientHistory history,
@@ -170,19 +190,22 @@ abstract class AbstractDataStoreClientBehavior extends ClientActorBehavior<Shard
     public final ClientLocalHistory createLocalHistory() {
         final LocalHistoryIdentifier historyId = new LocalHistoryIdentifier(getIdentifier(),
             nextHistoryId.getAndIncrement());
-        final ClientLocalHistory history = new ClientLocalHistory(this, historyId);
-        LOG.debug("{}: creating a new local history {}", persistenceId(), history);
 
-        Verify.verify(histories.put(historyId, history) == null);
+        final long stamp = lock.readLock();
+        try {
+            if (aborted != null) {
+                Throwables.throwIfUnchecked(aborted);
+                throw new RuntimeException(aborted);
+            }
+
+            final ClientLocalHistory history = new ClientLocalHistory(this, historyId);
+            LOG.debug("{}: creating a new local history {}", persistenceId(), history);
 
-        final Throwable a = aborted;
-        if (a != null) {
-            history.localAbort(a);
-            histories.remove(historyId, history);
-            throw Throwables.propagate(a);
+            Verify.verify(histories.put(historyId, history) == null);
+            return history;
+        } finally {
+            lock.unlockRead(stamp);
         }
-
-        return history;
     }
 
     @Override
@@ -191,9 +214,21 @@ abstract class AbstractDataStoreClientBehavior extends ClientActorBehavior<Shard
     }
 
     @Override
-    public final void close() {
+    public final ClientSnapshot createSnapshot() {
+        return singleHistory.takeSnapshot();
+    }
+
+    @Override
+    public void close() {
+        super.close();
         context().executeInActor(this::shutdown);
     }
 
-    abstract Long resolveShardForPath(final YangInstanceIdentifier path);
+    abstract Long resolveShardForPath(YangInstanceIdentifier path);
+
+    abstract Stream<Long> resolveAllShards();
+
+    final ActorUtils actorUtils() {
+        return ((AbstractShardBackendResolver) resolver()).actorUtils();
+    }
 }