Bump odlparent/yangtools/mdsal
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / ProxyHistory.java
index e75a2df4c0d6084ba6629ff2926357a7b6a8a0f1..340cc5581fbbe5ac051b033d8df86677c5aaa3f0 100644 (file)
@@ -7,8 +7,10 @@
  */
 package org.opendaylight.controller.cluster.databroker.actors.dds;
 
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
+
 import akka.actor.ActorRef;
-import com.google.common.base.Preconditions;
 import com.google.common.base.Verify;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.Collection;
@@ -19,9 +21,9 @@ import java.util.Optional;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.BiConsumer;
 import java.util.function.Consumer;
-import javax.annotation.concurrent.GuardedBy;
+import org.checkerframework.checker.lock.qual.GuardedBy;
+import org.checkerframework.checker.lock.qual.Holding;
 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
 import org.opendaylight.controller.cluster.access.client.ClientActorContext;
 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
@@ -37,8 +39,8 @@ import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.opendaylight.controller.cluster.access.concepts.Response;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.yangtools.concepts.Identifiable;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.ReadOnlyDataTree;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,12 +51,12 @@ import org.slf4j.LoggerFactory;
  */
 abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
     private abstract static class AbstractLocal extends ProxyHistory {
-        private final DataTree dataTree;
+        private final ReadOnlyDataTree dataTree;
 
         AbstractLocal(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
-            final LocalHistoryIdentifier identifier, final DataTree dataTree) {
+            final LocalHistoryIdentifier identifier, final ReadOnlyDataTree dataTree) {
             super(parent, connection, identifier);
-            this.dataTree = Preconditions.checkNotNull(dataTree);
+            this.dataTree = requireNonNull(dataTree);
         }
 
         final DataTreeSnapshot takeSnapshot() {
@@ -80,14 +82,14 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
         private volatile LocalReadWriteProxyTransaction lastSealed;
 
         Local(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
-            final LocalHistoryIdentifier identifier, final DataTree dataTree) {
+            final LocalHistoryIdentifier identifier, final ReadOnlyDataTree dataTree) {
             super(parent, connection, identifier, dataTree);
         }
 
         @Override
         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
                 final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
-            Preconditions.checkState(lastOpen == null, "Proxy %s has %s currently open", this, lastOpen);
+            checkState(lastOpen == null, "Proxy %s has %s currently open", this, lastOpen);
 
             if (isDone) {
                 // Done transactions do not register on our radar on should not have any state associated.
@@ -128,16 +130,15 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
         @Override
         void onTransactionCompleted(final AbstractProxyTransaction tx) {
             Verify.verify(tx instanceof LocalProxyTransaction);
-            if (tx instanceof LocalReadWriteProxyTransaction) {
-                if (LAST_SEALED_UPDATER.compareAndSet(this, (LocalReadWriteProxyTransaction) tx, null)) {
-                    LOG.debug("Completed last sealed transaction {}", tx);
-                }
+            if (tx instanceof LocalReadWriteProxyTransaction
+                    && LAST_SEALED_UPDATER.compareAndSet(this, (LocalReadWriteProxyTransaction) tx, null)) {
+                LOG.debug("Completed last sealed transaction {}", tx);
             }
         }
 
         @Override
         void onTransactionSealed(final AbstractProxyTransaction tx) {
-            Preconditions.checkState(tx.equals(lastOpen));
+            checkState(tx.equals(lastOpen));
             lastSealed = lastOpen;
             lastOpen = null;
         }
@@ -145,7 +146,7 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
 
     private static final class LocalSingle extends AbstractLocal {
         LocalSingle(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
-            final LocalHistoryIdentifier identifier, final DataTree dataTree) {
+            final LocalHistoryIdentifier identifier, final ReadOnlyDataTree dataTree) {
             super(parent, connection, identifier, dataTree);
         }
 
@@ -218,7 +219,7 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
             return identifier;
         }
 
-        @GuardedBy("lock")
+        @Holding("lock")
         @Override
         void replayRequests(final Collection<ConnectionEntry> previousEntries) {
             // First look for our Create message
@@ -272,22 +273,34 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
         }
 
         @Override
-        void forwardRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback,
-                final BiConsumer<Request<?, ?>, Consumer<Response<?, ?>>> forwardTo) throws RequestException {
-            // FIXME: do not use sendRequest() once we have throttling in place, as we have already waited the
-            //        period required to get into the queue.
+        void replayEntry(final ConnectionEntry entry, final Consumer<ConnectionEntry> replayTo)
+                throws RequestException {
+            final Request<?, ?> request = entry.getRequest();
             if (request instanceof TransactionRequest) {
-                forwardTransactionRequest((TransactionRequest<?>) request, callback);
+                lookupProxy(request).replayRequest((TransactionRequest<?>) request, entry.getCallback(),
+                    entry.getEnqueuedTicks());
             } else if (request instanceof LocalHistoryRequest) {
-                forwardTo.accept(request, callback);
+                replayTo.accept(entry);
             } else {
                 throw new IllegalArgumentException("Unhandled request " + request);
             }
         }
 
-        private void forwardTransactionRequest(final TransactionRequest<?> request,
-                final Consumer<Response<?, ?>> callback) throws RequestException {
+        @Override
+        void forwardEntry(final ConnectionEntry entry, final Consumer<ConnectionEntry> forwardTo)
+                throws RequestException {
+            final Request<?, ?> request = entry.getRequest();
+            if (request instanceof TransactionRequest) {
+                lookupProxy(request).forwardRequest((TransactionRequest<?>) request, entry.getCallback());
+            } else if (request instanceof LocalHistoryRequest) {
+                forwardTo.accept(entry);
+            } else {
+                throw new IllegalArgumentException("Unhandled request " + request);
+            }
+        }
 
+        private AbstractProxyTransaction lookupProxy(final Request<?, ?> request)
+                throws RequestReplayException {
             final AbstractProxyTransaction proxy;
             lock.lock();
             try {
@@ -295,11 +308,11 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
             } finally {
                 lock.unlock();
             }
-            if (proxy == null) {
-                throw new RequestReplayException("Failed to find proxy for %s", request);
+            if (proxy != null) {
+                return proxy;
             }
 
-            proxy.forwardRequest(request, callback);
+            throw new RequestReplayException("Failed to find proxy for %s", request);
         }
     }
 
@@ -317,14 +330,14 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
 
     private ProxyHistory(final AbstractClientHistory parent,
             final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
-        this.parent = Preconditions.checkNotNull(parent);
-        this.connection = Preconditions.checkNotNull(connection);
-        this.identifier = Preconditions.checkNotNull(identifier);
+        this.parent = requireNonNull(parent);
+        this.connection = requireNonNull(connection);
+        this.identifier = requireNonNull(identifier);
     }
 
     static ProxyHistory createClient(final AbstractClientHistory parent,
             final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
-        final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
+        final Optional<ReadOnlyDataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
         return dataTree.isPresent() ? new Local(parent, connection, identifier, dataTree.get())
              : new Remote(parent, connection, identifier);
     }
@@ -332,7 +345,7 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
     static ProxyHistory createSingle(final AbstractClientHistory parent,
             final AbstractClientConnection<ShardBackendInfo> connection,
             final LocalHistoryIdentifier identifier) {
-        final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
+        final Optional<ReadOnlyDataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
         return dataTree.isPresent() ? new LocalSingle(parent, connection, identifier, dataTree.get())
              : new RemoteSingle(parent, connection, identifier);
     }
@@ -439,9 +452,11 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
     }
 
     @GuardedBy("lock")
+    @SuppressWarnings("checkstyle:hiddenField")
     abstract AbstractProxyTransaction doCreateTransactionProxy(AbstractClientConnection<ShardBackendInfo> connection,
             TransactionIdentifier txId, boolean snapshotOnly, boolean isDone);
 
+    @SuppressWarnings("checkstyle:hiddenField")
     abstract ProxyHistory createSuccessor(AbstractClientConnection<ShardBackendInfo> connection);
 
     @SuppressFBWarnings(value = "UL_UNRELEASED_LOCK", justification = "Lock is released asynchronously via the cohort")
@@ -479,12 +494,12 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
         LOG.debug("Proxy {} purge completed with {}", this, response);
     }
 
-    @GuardedBy("lock")
+    @Holding("lock")
     void onTransactionAborted(final AbstractProxyTransaction tx) {
         // No-op for most implementations
     }
 
-    @GuardedBy("lock")
+    @Holding("lock")
     void onTransactionCompleted(final AbstractProxyTransaction tx) {
         // No-op for most implementations
     }