Improve segmented journal actor metrics
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / ProxyHistory.java
index 7c3b2010c2ce809307646f852ee473e9d6c9cbf4..437effe9ae0df3264d65db37b6b2a4bf60377cf0 100644 (file)
@@ -7,28 +7,38 @@
  */
 package org.opendaylight.controller.cluster.databroker.actors.dds;
 
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Verify.verify;
+import static com.google.common.base.Verify.verifyNotNull;
+import static java.util.Objects.requireNonNull;
+
 import akka.actor.ActorRef;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Verify;
+import com.google.common.collect.ImmutableList;
+import com.google.common.primitives.UnsignedLong;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.BiConsumer;
 import java.util.function.Consumer;
-import javax.annotation.concurrent.GuardedBy;
+import org.checkerframework.checker.lock.qual.GuardedBy;
+import org.checkerframework.checker.lock.qual.Holding;
+import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
+import org.opendaylight.controller.cluster.access.client.ClientActorContext;
 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
 import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
 import org.opendaylight.controller.cluster.access.commands.DestroyLocalHistoryRequest;
 import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
 import org.opendaylight.controller.cluster.access.commands.PurgeLocalHistoryRequest;
+import org.opendaylight.controller.cluster.access.commands.SkipTransactionsRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.Request;
@@ -36,8 +46,8 @@ import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.opendaylight.controller.cluster.access.concepts.Response;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.yangtools.concepts.Identifiable;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+import org.opendaylight.yangtools.yang.data.tree.api.DataTreeSnapshot;
+import org.opendaylight.yangtools.yang.data.tree.api.ReadOnlyDataTree;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,12 +58,12 @@ import org.slf4j.LoggerFactory;
  */
 abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
     private abstract static class AbstractLocal extends ProxyHistory {
-        private final DataTree dataTree;
+        private final ReadOnlyDataTree dataTree;
 
         AbstractLocal(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
-            final LocalHistoryIdentifier identifier, final DataTree dataTree) {
+            final LocalHistoryIdentifier identifier, final ReadOnlyDataTree dataTree) {
             super(parent, connection, identifier);
-            this.dataTree = Preconditions.checkNotNull(dataTree);
+            this.dataTree = requireNonNull(dataTree);
         }
 
         final DataTreeSnapshot takeSnapshot() {
@@ -79,14 +89,20 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
         private volatile LocalReadWriteProxyTransaction lastSealed;
 
         Local(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
-            final LocalHistoryIdentifier identifier, final DataTree dataTree) {
+            final LocalHistoryIdentifier identifier, final ReadOnlyDataTree dataTree) {
             super(parent, connection, identifier, dataTree);
         }
 
         @Override
         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
-                final TransactionIdentifier txId, final boolean snapshotOnly) {
-            Preconditions.checkState(lastOpen == null, "Proxy %s has %s currently open", this, lastOpen);
+                final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
+            checkState(lastOpen == null, "Proxy %s has %s currently open", this, lastOpen);
+
+            if (isDone) {
+                // Done transactions do not register on our radar on should not have any state associated.
+                return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId)
+                        : new LocalReadWriteProxyTransaction(this, txId);
+            }
 
             // onTransactionCompleted() runs concurrently
             final LocalReadWriteProxyTransaction localSealed = lastSealed;
@@ -120,17 +136,16 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
 
         @Override
         void onTransactionCompleted(final AbstractProxyTransaction tx) {
-            Verify.verify(tx instanceof LocalProxyTransaction);
-            if (tx instanceof LocalReadWriteProxyTransaction) {
-                if (LAST_SEALED_UPDATER.compareAndSet(this, (LocalReadWriteProxyTransaction) tx, null)) {
-                    LOG.debug("Completed last sealed transaction {}", tx);
-                }
+            verify(tx instanceof LocalProxyTransaction, "Unexpected transaction %s", tx);
+            if (tx instanceof LocalReadWriteProxyTransaction
+                    && LAST_SEALED_UPDATER.compareAndSet(this, (LocalReadWriteProxyTransaction) tx, null)) {
+                LOG.debug("Completed last sealed transaction {}", tx);
             }
         }
 
         @Override
         void onTransactionSealed(final AbstractProxyTransaction tx) {
-            Preconditions.checkState(tx.equals(lastOpen));
+            checkState(tx.equals(lastOpen));
             lastSealed = lastOpen;
             lastOpen = null;
         }
@@ -138,13 +153,13 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
 
     private static final class LocalSingle extends AbstractLocal {
         LocalSingle(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
-            final LocalHistoryIdentifier identifier, final DataTree dataTree) {
+            final LocalHistoryIdentifier identifier, final ReadOnlyDataTree dataTree) {
             super(parent, connection, identifier, dataTree);
         }
 
         @Override
         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
-                final TransactionIdentifier txId, final boolean snapshotOnly) {
+                final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
             final DataTreeSnapshot snapshot = takeSnapshot();
             return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId, snapshot) :
                 new LocalReadWriteProxyTransaction(this, txId, snapshot);
@@ -164,8 +179,8 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
 
         @Override
         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
-                final TransactionIdentifier txId, final boolean snapshotOnly) {
-            return new RemoteProxyTransaction(this, txId, snapshotOnly, true);
+                final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
+            return new RemoteProxyTransaction(this, txId, snapshotOnly, true, isDone);
         }
 
         @Override
@@ -182,8 +197,8 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
 
         @Override
         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
-                final TransactionIdentifier txId, final boolean snapshotOnly) {
-            return new RemoteProxyTransaction(this, txId, snapshotOnly, false);
+                final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
+            return new RemoteProxyTransaction(this, txId, snapshotOnly, false, isDone);
         }
 
         @Override
@@ -211,7 +226,7 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
             return identifier;
         }
 
-        @GuardedBy("lock")
+        @Holding("lock")
         @Override
         void replayRequests(final Collection<ConnectionEntry> previousEntries) {
             // First look for our Create message
@@ -220,9 +235,9 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
                 final ConnectionEntry e = it.next();
                 final Request<?, ?> req = e.getRequest();
                 if (identifier.equals(req.getTarget())) {
-                    Verify.verify(req instanceof LocalHistoryRequest);
+                    verify(req instanceof LocalHistoryRequest, "Unexpected request %s", req);
                     if (req instanceof CreateLocalHistoryRequest) {
-                        successor.connection.sendRequest(req, e.getCallback());
+                        successor.connection.enqueueRequest(req, e.getCallback(), e.getEnqueuedTicks());
                         it.remove();
                         break;
                     }
@@ -230,11 +245,16 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
             }
 
             for (AbstractProxyTransaction t : proxies.values()) {
-                LOG.debug("{} creating successor transaction proxy for {}", identifier, t);
-                final AbstractProxyTransaction newProxy = successor.createTransactionProxy(t.getIdentifier(),
-                    t.isSnapshotOnly());
-                LOG.debug("{} created successor transaction proxy {}", identifier, newProxy);
-                t.replayMessages(newProxy, previousEntries);
+                LOG.debug("{} replaying messages to old proxy {} towards successor {}", identifier, t, successor);
+                t.replayMessages(successor, previousEntries);
+            }
+
+            // Forward any skipped transactions
+            final var local = skippedTransactions;
+            if (local != null) {
+                LOG.debug("{} forwarding skipped transactions towards successor {}", identifier, successor);
+                successor.skipTransactions(local);
+                skippedTransactions = null;
             }
 
             // Now look for any finalizing messages
@@ -243,9 +263,9 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
                 final ConnectionEntry e  = it.next();
                 final Request<?, ?> req = e.getRequest();
                 if (identifier.equals(req.getTarget())) {
-                    Verify.verify(req instanceof LocalHistoryRequest);
+                    verify(req instanceof LocalHistoryRequest, "Unexpected request %s", req);
                     if (req instanceof DestroyLocalHistoryRequest) {
-                        successor.connection.sendRequest(req, e.getCallback());
+                        successor.connection.enqueueRequest(req, e.getCallback(), e.getEnqueuedTicks());
                         it.remove();
                         break;
                     }
@@ -253,10 +273,10 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
             }
         }
 
-        @GuardedBy("lock")
+        @Holding("lock")
         @Override
         ProxyHistory finishReconnect() {
-            final ProxyHistory ret = Verify.verifyNotNull(successor);
+            final ProxyHistory ret = verifyNotNull(successor);
 
             for (AbstractProxyTransaction t : proxies.values()) {
                 t.finishReconnect();
@@ -268,22 +288,34 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
         }
 
         @Override
-        void forwardRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback,
-                final BiConsumer<Request<?, ?>, Consumer<Response<?, ?>>> forwardTo) throws RequestException {
-            // FIXME: do not use sendRequest() once we have throttling in place, as we have already waited the
-            //        period required to get into the queue.
+        void replayEntry(final ConnectionEntry entry, final Consumer<ConnectionEntry> replayTo)
+                throws RequestException {
+            final Request<?, ?> request = entry.getRequest();
             if (request instanceof TransactionRequest) {
-                forwardTransactionRequest((TransactionRequest<?>) request, callback);
+                lookupProxy(request).replayRequest((TransactionRequest<?>) request, entry.getCallback(),
+                    entry.getEnqueuedTicks());
             } else if (request instanceof LocalHistoryRequest) {
-                forwardTo.accept(request, callback);
+                replayTo.accept(entry);
             } else {
                 throw new IllegalArgumentException("Unhandled request " + request);
             }
         }
 
-        private void forwardTransactionRequest(final TransactionRequest<?> request,
-                final Consumer<Response<?, ?>> callback) throws RequestException {
+        @Override
+        void forwardEntry(final ConnectionEntry entry, final Consumer<ConnectionEntry> forwardTo)
+                throws RequestException {
+            final Request<?, ?> request = entry.getRequest();
+            if (request instanceof TransactionRequest) {
+                lookupProxy(request).forwardRequest((TransactionRequest<?>) request, entry.getCallback());
+            } else if (request instanceof LocalHistoryRequest) {
+                forwardTo.accept(entry);
+            } else {
+                throw new IllegalArgumentException("Unhandled request " + request);
+            }
+        }
 
+        private AbstractProxyTransaction lookupProxy(final Request<?, ?> request)
+                throws RequestReplayException {
             final AbstractProxyTransaction proxy;
             lock.lock();
             try {
@@ -291,53 +323,78 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
             } finally {
                 lock.unlock();
             }
-            if (proxy == null) {
-                throw new RequestReplayException("Failed to find proxy for %s", request);
+            if (proxy != null) {
+                return proxy;
             }
 
-            proxy.forwardRequest(request, callback);
+            throw new RequestReplayException("Failed to find proxy for %s", request);
         }
     }
 
     private static final Logger LOG = LoggerFactory.getLogger(ProxyHistory.class);
 
     private final Lock lock = new ReentrantLock();
-    private final LocalHistoryIdentifier identifier;
-    private final AbstractClientConnection<ShardBackendInfo> connection;
-    private final AbstractClientHistory parent;
+    private final @NonNull LocalHistoryIdentifier identifier;
+    private final @NonNull AbstractClientConnection<ShardBackendInfo> connection;
+    private final @NonNull AbstractClientHistory parent;
 
     @GuardedBy("lock")
     private final Map<TransactionIdentifier, AbstractProxyTransaction> proxies = new LinkedHashMap<>();
     @GuardedBy("lock")
     private ProxyHistory successor;
 
+    // List of transaction identifiers which were allocated by our parent history, but did not touch our shard. Each of
+    // these represents a hole in otherwise-contiguous allocation of transactionIds. These holes are problematic, as
+    // each of them prevents LeaderFrontendState.purgedHistories from coalescing, leading to a gradual heap exhaustion.
+    //
+    // <p>
+    // We keep these in an ArrayList for fast insertion, as that happens when we are otherwise idle. We translate these
+    // into purge requests when:
+    // - we are about to allocate a new transaction
+    // - we get a successor proxy
+    // - the list grows unreasonably long
+    //
+    // TODO: we are tracking entire TransactionIdentifiers, but really only need to track the longs. Do that once we
+    //       have a {@code List<long>}.
+    // FIXME: this is not tuneable, but perhaps should be
+    // FIXME: default value deserves some explanation -- this affects depth of an RB Tree on the receiving end.
+    private static final int PURGE_SKIPPED_TXID_THRESHOLD = 256;
+
+    @GuardedBy("lock")
+    private volatile List<TransactionIdentifier> skippedTransactions;
+
     private ProxyHistory(final AbstractClientHistory parent,
             final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
-        this.parent = Preconditions.checkNotNull(parent);
-        this.connection = Preconditions.checkNotNull(connection);
-        this.identifier = Preconditions.checkNotNull(identifier);
+        this.parent = requireNonNull(parent);
+        this.connection = requireNonNull(connection);
+        this.identifier = requireNonNull(identifier);
     }
 
     static ProxyHistory createClient(final AbstractClientHistory parent,
             final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
-        final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
-        return dataTree.isPresent() ? new Local(parent, connection, identifier, dataTree.get())
+        final Optional<ReadOnlyDataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
+        return dataTree.isPresent() ? new Local(parent, connection, identifier, dataTree.orElseThrow())
              : new Remote(parent, connection, identifier);
     }
 
     static ProxyHistory createSingle(final AbstractClientHistory parent,
             final AbstractClientConnection<ShardBackendInfo> connection,
             final LocalHistoryIdentifier identifier) {
-        final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
-        return dataTree.isPresent() ? new LocalSingle(parent, connection, identifier, dataTree.get())
+        final Optional<ReadOnlyDataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
+        return dataTree.isPresent() ? new LocalSingle(parent, connection, identifier, dataTree.orElseThrow())
              : new RemoteSingle(parent, connection, identifier);
     }
 
     @Override
+    // Non-final for mocking
     public LocalHistoryIdentifier getIdentifier() {
         return identifier;
     }
 
+    final ClientActorContext context() {
+        return connection.context();
+    }
+
     final long currentTime() {
         return connection.currentTime();
     }
@@ -352,14 +409,20 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
 
     final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId,
             final boolean snapshotOnly) {
+        return createTransactionProxy(txId, snapshotOnly, false);
+    }
+
+    // Non-final for mocking
+    AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId, final boolean snapshotOnly,
+            final boolean isDone) {
         lock.lock();
         try {
             if (successor != null) {
-                return successor.createTransactionProxy(txId, snapshotOnly);
+                return successor.createTransactionProxy(txId, snapshotOnly, isDone);
             }
 
             final TransactionIdentifier proxyId = new TransactionIdentifier(identifier, txId.getTransactionId());
-            final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId, snapshotOnly);
+            final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId, snapshotOnly, isDone);
             proxies.put(proxyId, ret);
             LOG.debug("Allocated proxy {} for transaction {}", proxyId, txId);
             return ret;
@@ -368,11 +431,91 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
         }
     }
 
+    final void skipTransaction(final TransactionIdentifier txId) {
+        lock.lock();
+        try {
+            if (successor != null) {
+                successor.skipTransaction(txId);
+                return;
+            }
+
+            var local = skippedTransactions;
+            if (local == null) {
+                skippedTransactions = local = new ArrayList<>();
+            }
+            local.add(txId);
+            LOG.debug("Recorded skipped transaction {}", txId);
+            skipIfNeeded(local);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Holding("lock")
+    private void skipIfNeeded(final List<TransactionIdentifier> current) {
+        if (current.size() >= PURGE_SKIPPED_TXID_THRESHOLD) {
+            skippedTransactions = null;
+            doSkipTransactions(current);
+        }
+    }
+
+    private void skipTransactions(final List<TransactionIdentifier> toSkip) {
+        lock.lock();
+        try {
+            if (successor != null) {
+                successor.skipTransactions(toSkip);
+                return;
+            }
+
+            var local = skippedTransactions;
+            if (local != null) {
+                local.addAll(toSkip);
+            } else {
+                skippedTransactions = local = toSkip;
+            }
+            skipIfNeeded(local);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    private void skipTransactions() {
+        var local = skippedTransactions;
+        if (local != null) {
+            lock.lock();
+            try {
+                local = skippedTransactions;
+                if (local != null && successor == null) {
+                    skippedTransactions = null;
+                    doSkipTransactions(local);
+                }
+            } finally {
+                lock.unlock();
+            }
+        }
+    }
+
+    @Holding("lock")
+    private void doSkipTransactions(final List<TransactionIdentifier> toSkip) {
+        final var txIds = toSkip.stream()
+            .mapToLong(TransactionIdentifier::getTransactionId)
+            .distinct()
+            .sorted()
+            .mapToObj(UnsignedLong::fromLongBits)
+            .collect(ImmutableList.toImmutableList());
+
+        LOG.debug("Proxy {} skipping transactions {}", this, txIds);
+        connection.enqueueRequest(new SkipTransactionsRequest(new TransactionIdentifier(identifier,
+            txIds.get(0).longValue()), 0, localActor(),txIds.subList(1, txIds.size())), resp -> {
+                LOG.debug("Proxy {} confirmed transaction skip", this);
+            }, connection.currentTime());
+    }
+
     final void abortTransaction(final AbstractProxyTransaction tx) {
         lock.lock();
         try {
-            proxies.remove(tx.getIdentifier());
-            LOG.debug("Proxy {} aborting transaction {}", this, tx);
+            // Removal will be completed once purge completes
+            LOG.debug("Proxy {} aborted transaction {}", this, tx);
             onTransactionAborted(tx);
         } finally {
             lock.unlock();
@@ -382,7 +525,7 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
     final void completeTransaction(final AbstractProxyTransaction tx) {
         lock.lock();
         try {
-            proxies.remove(tx.getIdentifier());
+            // Removal will be completed once purge completes
             LOG.debug("Proxy {} completing transaction {}", this, tx);
             onTransactionCompleted(tx);
         } finally {
@@ -390,6 +533,16 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
         }
     }
 
+    final void purgeTransaction(final AbstractProxyTransaction tx) {
+        lock.lock();
+        try {
+            proxies.remove(tx.getIdentifier());
+            LOG.debug("Proxy {} purged transaction {}", this, tx);
+        } finally {
+            lock.unlock();
+        }
+    }
+
     final void close() {
         lock.lock();
         try {
@@ -408,21 +561,26 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
 
     final void enqueueRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
             final long enqueuedTicks) {
+        skipTransactions();
         connection.enqueueRequest(request, callback, enqueuedTicks);
     }
 
     final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
+        skipTransactions();
         connection.sendRequest(request, callback);
     }
 
-    @GuardedBy("lock")
+    @Holding("lock")
+    @SuppressWarnings("checkstyle:hiddenField")
     abstract AbstractProxyTransaction doCreateTransactionProxy(AbstractClientConnection<ShardBackendInfo> connection,
-            TransactionIdentifier txId, boolean snapshotOnly);
+            TransactionIdentifier txId, boolean snapshotOnly, boolean isDone);
 
+    @Holding("lock")
+    @SuppressWarnings("checkstyle:hiddenField")
     abstract ProxyHistory createSuccessor(AbstractClientConnection<ShardBackendInfo> connection);
 
     @SuppressFBWarnings(value = "UL_UNRELEASED_LOCK", justification = "Lock is released asynchronously via the cohort")
-    ProxyReconnectCohort startReconnect(final ConnectedClientConnection<ShardBackendInfo> newConnection) {
+    final ProxyReconnectCohort startReconnect(final ConnectedClientConnection<ShardBackendInfo> newConnection) {
         lock.lock();
         if (successor != null) {
             lock.unlock();
@@ -456,16 +614,17 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
         LOG.debug("Proxy {} purge completed with {}", this, response);
     }
 
-    @GuardedBy("lock")
+    @Holding("lock")
     void onTransactionAborted(final AbstractProxyTransaction tx) {
         // No-op for most implementations
     }
 
-    @GuardedBy("lock")
+    @Holding("lock")
     void onTransactionCompleted(final AbstractProxyTransaction tx) {
         // No-op for most implementations
     }
 
+    @Holding("lock")
     void onTransactionSealed(final AbstractProxyTransaction tx) {
         // No-op on most implementations
     }