X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FProxyHistory.java;h=70b5960a056d943ef8f927094e3b433fb85f3021;hb=refs%2Fchanges%2F49%2F85749%2F63;hp=07fcbebad01a263d38a0bfabe839e8b02832ffe2;hpb=b5444f8c2c10ded63d6a9e890db61b0f3aa2095e;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java index 07fcbebad0..70b5960a05 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java @@ -7,22 +7,38 @@ */ package org.opendaylight.controller.cluster.databroker.actors.dds; +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Verify.verify; +import static com.google.common.base.Verify.verifyNotNull; +import static java.util.Objects.requireNonNull; + import akka.actor.ActorRef; -import com.google.common.base.Preconditions; -import com.google.common.base.Verify; +import com.google.common.collect.ImmutableList; +import com.google.common.primitives.UnsignedLong; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import java.util.function.BiConsumer; import java.util.function.Consumer; -import javax.annotation.concurrent.GuardedBy; +import org.checkerframework.checker.lock.qual.GuardedBy; +import org.checkerframework.checker.lock.qual.Holding; +import org.eclipse.jdt.annotation.NonNull; import org.opendaylight.controller.cluster.access.client.AbstractClientConnection; +import org.opendaylight.controller.cluster.access.client.ClientActorContext; import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection; +import org.opendaylight.controller.cluster.access.client.ConnectionEntry; +import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest; +import org.opendaylight.controller.cluster.access.commands.DestroyLocalHistoryRequest; import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest; +import org.opendaylight.controller.cluster.access.commands.PurgeLocalHistoryRequest; +import org.opendaylight.controller.cluster.access.commands.SkipTransactionsRequest; import org.opendaylight.controller.cluster.access.commands.TransactionRequest; import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; import org.opendaylight.controller.cluster.access.concepts.Request; @@ -30,9 +46,8 @@ import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.Response; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.yangtools.concepts.Identifiable; -import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeModification; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; +import org.opendaylight.yangtools.yang.data.api.schema.tree.ReadOnlyDataTree; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,12 +58,12 @@ import org.slf4j.LoggerFactory; */ abstract class ProxyHistory implements Identifiable { private abstract static class AbstractLocal extends ProxyHistory { - private final DataTree dataTree; + private final ReadOnlyDataTree dataTree; - AbstractLocal(final AbstractClientConnection connection, - final LocalHistoryIdentifier identifier, final DataTree dataTree) { - super(connection, identifier); - this.dataTree = Preconditions.checkNotNull(dataTree); + AbstractLocal(final AbstractClientHistory parent, final AbstractClientConnection connection, + final LocalHistoryIdentifier identifier, final ReadOnlyDataTree dataTree) { + super(parent, connection, identifier); + this.dataTree = requireNonNull(dataTree); } final DataTreeSnapshot takeSnapshot() { @@ -57,40 +72,40 @@ abstract class ProxyHistory implements Identifiable { } private abstract static class AbstractRemote extends ProxyHistory { - AbstractRemote(final AbstractClientConnection connection, + AbstractRemote(final AbstractClientHistory parent, final AbstractClientConnection connection, final LocalHistoryIdentifier identifier) { - super(connection, identifier); - } - - @Override - final AbstractProxyTransaction doCreateTransactionProxy( - final AbstractClientConnection connection, final TransactionIdentifier txId) { - return new RemoteProxyTransaction(this, txId); + super(parent, connection, identifier); } } private static final class Local extends AbstractLocal { - private static final AtomicReferenceFieldUpdater LAST_SEALED_UPDATER = - AtomicReferenceFieldUpdater.newUpdater(Local.class, LocalProxyTransaction.class, "lastSealed"); + private static final AtomicReferenceFieldUpdater LAST_SEALED_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(Local.class, LocalReadWriteProxyTransaction.class, "lastSealed"); // Tracks the last open and last sealed transaction. We need to track both in case the user ends up aborting // the open one and attempts to create a new transaction again. - private LocalProxyTransaction lastOpen; + private LocalReadWriteProxyTransaction lastOpen; - private volatile LocalProxyTransaction lastSealed; + private volatile LocalReadWriteProxyTransaction lastSealed; - Local(final AbstractClientConnection connection, final LocalHistoryIdentifier identifier, - final DataTree dataTree) { - super(connection, identifier, dataTree); + Local(final AbstractClientHistory parent, final AbstractClientConnection connection, + final LocalHistoryIdentifier identifier, final ReadOnlyDataTree dataTree) { + super(parent, connection, identifier, dataTree); } @Override AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection connection, - final TransactionIdentifier txId) { - Preconditions.checkState(lastOpen == null, "Proxy %s has %s currently open", this, lastOpen); + final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) { + checkState(lastOpen == null, "Proxy %s has %s currently open", this, lastOpen); + + if (isDone) { + // Done transactions do not register on our radar on should not have any state associated. + return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId) + : new LocalReadWriteProxyTransaction(this, txId); + } // onTransactionCompleted() runs concurrently - final LocalProxyTransaction localSealed = lastSealed; + final LocalReadWriteProxyTransaction localSealed = lastSealed; final DataTreeSnapshot baseSnapshot; if (localSealed != null) { baseSnapshot = localSealed.getSnapshot(); @@ -98,79 +113,97 @@ abstract class ProxyHistory implements Identifiable { baseSnapshot = takeSnapshot(); } - lastOpen = new LocalProxyTransaction(this, txId, - (CursorAwareDataTreeModification) baseSnapshot.newModification()); + if (snapshotOnly) { + return new LocalReadOnlyProxyTransaction(this, txId, baseSnapshot); + } + + lastOpen = new LocalReadWriteProxyTransaction(this, txId, baseSnapshot); LOG.debug("Proxy {} open transaction {}", this, lastOpen); return lastOpen; } @Override ProxyHistory createSuccessor(final AbstractClientConnection connection) { - return createClient(connection, getIdentifier()); + return createClient(parent(), connection, getIdentifier()); } @Override void onTransactionAborted(final AbstractProxyTransaction tx) { - Preconditions.checkState(tx.equals(lastOpen)); - lastOpen = null; + if (tx.equals(lastOpen)) { + lastOpen = null; + } } @Override void onTransactionCompleted(final AbstractProxyTransaction tx) { - Verify.verify(tx instanceof LocalProxyTransaction); - - if (LAST_SEALED_UPDATER.compareAndSet(this, (LocalProxyTransaction) tx, null)) { + verify(tx instanceof LocalProxyTransaction, "Unexpected transaction %s", tx); + if (tx instanceof LocalReadWriteProxyTransaction + && LAST_SEALED_UPDATER.compareAndSet(this, (LocalReadWriteProxyTransaction) tx, null)) { LOG.debug("Completed last sealed transaction {}", tx); } } @Override void onTransactionSealed(final AbstractProxyTransaction tx) { - Preconditions.checkState(tx.equals(lastOpen)); + checkState(tx.equals(lastOpen)); lastSealed = lastOpen; lastOpen = null; } } private static final class LocalSingle extends AbstractLocal { - LocalSingle(final AbstractClientConnection connection, - final LocalHistoryIdentifier identifier, final DataTree dataTree) { - super(connection, identifier, dataTree); + LocalSingle(final AbstractClientHistory parent, final AbstractClientConnection connection, + final LocalHistoryIdentifier identifier, final ReadOnlyDataTree dataTree) { + super(parent, connection, identifier, dataTree); } @Override AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection connection, - final TransactionIdentifier txId) { - return new LocalProxyTransaction(this, txId, - (CursorAwareDataTreeModification) takeSnapshot().newModification()); + final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) { + final DataTreeSnapshot snapshot = takeSnapshot(); + return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId, snapshot) : + new LocalReadWriteProxyTransaction(this, txId, snapshot); } @Override ProxyHistory createSuccessor(final AbstractClientConnection connection) { - return createSingle(connection, getIdentifier()); + return createSingle(parent(), connection, getIdentifier()); } } private static final class Remote extends AbstractRemote { - Remote(final AbstractClientConnection connection, final LocalHistoryIdentifier identifier) { - super(connection, identifier); + Remote(final AbstractClientHistory parent, final AbstractClientConnection connection, + final LocalHistoryIdentifier identifier) { + super(parent, connection, identifier); + } + + @Override + AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection connection, + final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) { + return new RemoteProxyTransaction(this, txId, snapshotOnly, true, isDone); } @Override ProxyHistory createSuccessor(final AbstractClientConnection connection) { - return createClient(connection, getIdentifier()); + return createClient(parent(), connection, getIdentifier()); } } private static final class RemoteSingle extends AbstractRemote { - RemoteSingle(final AbstractClientConnection connection, + RemoteSingle(final AbstractClientHistory parent, final AbstractClientConnection connection, final LocalHistoryIdentifier identifier) { - super(connection, identifier); + super(parent, connection, identifier); + } + + @Override + AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection connection, + final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) { + return new RemoteProxyTransaction(this, txId, snapshotOnly, false, isDone); } @Override ProxyHistory createSuccessor(final AbstractClientConnection connection) { - return createSingle(connection, getIdentifier()); + return createSingle(parent(), connection, getIdentifier()); } } @@ -193,21 +226,57 @@ abstract class ProxyHistory implements Identifiable { return identifier; } - @GuardedBy("lock") + @Holding("lock") @Override - void replaySuccessfulRequests() { + void replayRequests(final Collection previousEntries) { + // First look for our Create message + Iterator it = previousEntries.iterator(); + while (it.hasNext()) { + final ConnectionEntry e = it.next(); + final Request req = e.getRequest(); + if (identifier.equals(req.getTarget())) { + verify(req instanceof LocalHistoryRequest, "Unexpected request %s", req); + if (req instanceof CreateLocalHistoryRequest) { + successor.connection.enqueueRequest(req, e.getCallback(), e.getEnqueuedTicks()); + it.remove(); + break; + } + } + } + for (AbstractProxyTransaction t : proxies.values()) { - LOG.debug("{} creating successor transaction proxy for {}", identifier, t); - final AbstractProxyTransaction newProxy = successor.createTransactionProxy(t.getIdentifier()); - LOG.debug("{} created successor transaction proxy {}", identifier, newProxy); - t.startReconnect(newProxy); + LOG.debug("{} replaying messages to old proxy {} towards successor {}", identifier, t, successor); + t.replayMessages(successor, previousEntries); + } + + // Forward any skipped transactions + final var local = skippedTransactions; + if (local != null) { + LOG.debug("{} forwarding skipped transactions towards successor {}", identifier, successor); + successor.skipTransactions(local); + skippedTransactions = null; + } + + // Now look for any finalizing messages + it = previousEntries.iterator(); + while (it.hasNext()) { + final ConnectionEntry e = it.next(); + final Request req = e.getRequest(); + if (identifier.equals(req.getTarget())) { + verify(req instanceof LocalHistoryRequest, "Unexpected request %s", req); + if (req instanceof DestroyLocalHistoryRequest) { + successor.connection.enqueueRequest(req, e.getCallback(), e.getEnqueuedTicks()); + it.remove(); + break; + } + } } } - @GuardedBy("lock") + @Holding("lock") @Override ProxyHistory finishReconnect() { - final ProxyHistory ret = Verify.verifyNotNull(successor); + final ProxyHistory ret = verifyNotNull(successor); for (AbstractProxyTransaction t : proxies.values()) { t.finishReconnect(); @@ -219,20 +288,34 @@ abstract class ProxyHistory implements Identifiable { } @Override - void replayRequest(final Request request, final Consumer> callback, - final BiConsumer, Consumer>> replayTo) throws RequestException { + void replayEntry(final ConnectionEntry entry, final Consumer replayTo) + throws RequestException { + final Request request = entry.getRequest(); if (request instanceof TransactionRequest) { - replayTransactionRequest((TransactionRequest) request, callback); + lookupProxy(request).replayRequest((TransactionRequest) request, entry.getCallback(), + entry.getEnqueuedTicks()); } else if (request instanceof LocalHistoryRequest) { - replayTo.accept(request, callback); + replayTo.accept(entry); } else { throw new IllegalArgumentException("Unhandled request " + request); } } - private void replayTransactionRequest(final TransactionRequest request, - final Consumer> callback) throws RequestException { + @Override + void forwardEntry(final ConnectionEntry entry, final Consumer forwardTo) + throws RequestException { + final Request request = entry.getRequest(); + if (request instanceof TransactionRequest) { + lookupProxy(request).forwardRequest((TransactionRequest) request, entry.getCallback()); + } else if (request instanceof LocalHistoryRequest) { + forwardTo.accept(entry); + } else { + throw new IllegalArgumentException("Unhandled request " + request); + } + } + private AbstractProxyTransaction lookupProxy(final Request request) + throws RequestReplayException { final AbstractProxyTransaction proxy; lock.lock(); try { @@ -240,63 +323,106 @@ abstract class ProxyHistory implements Identifiable { } finally { lock.unlock(); } - if (proxy == null) { - throw new RequestReplayException("Failed to find proxy for %s", request); + if (proxy != null) { + return proxy; } - proxy.replayRequest(request, callback); + throw new RequestReplayException("Failed to find proxy for %s", request); } } private static final Logger LOG = LoggerFactory.getLogger(ProxyHistory.class); private final Lock lock = new ReentrantLock(); - private final LocalHistoryIdentifier identifier; - private final AbstractClientConnection connection; + private final @NonNull LocalHistoryIdentifier identifier; + private final @NonNull AbstractClientConnection connection; + private final @NonNull AbstractClientHistory parent; @GuardedBy("lock") private final Map proxies = new LinkedHashMap<>(); @GuardedBy("lock") private ProxyHistory successor; - private ProxyHistory(final AbstractClientConnection connection, - final LocalHistoryIdentifier identifier) { - this.connection = Preconditions.checkNotNull(connection); - this.identifier = Preconditions.checkNotNull(identifier); + // List of transaction identifiers which were allocated by our parent history, but did not touch our shard. Each of + // these represents a hole in otherwise-contiguous allocation of transactionIds. These holes are problematic, as + // each of them prevents LeaderFrontendState.purgedHistories from coalescing, leading to a gradual heap exhaustion. + // + //

+ // We keep these in an ArrayList for fast insertion, as that happens when we are otherwise idle. We translate these + // into purge requests when: + // - we are about to allocate a new transaction + // - we get a successor proxy + // - the list grows unreasonably long + // + // TODO: we are tracking entire TransactionIdentifiers, but really only need to track the longs. Do that once we + // have a {@code List}. + // FIXME: this is not tuneable, but perhaps should be + // FIXME: default value deserves some explanation -- this affects depth of an RB Tree on the receiving end. + private static final int PURGE_SKIPPED_TXID_THRESHOLD = 256; + + @GuardedBy("lock") + private volatile List skippedTransactions; + + private ProxyHistory(final AbstractClientHistory parent, + final AbstractClientConnection connection, final LocalHistoryIdentifier identifier) { + this.parent = requireNonNull(parent); + this.connection = requireNonNull(connection); + this.identifier = requireNonNull(identifier); } - static ProxyHistory createClient(final AbstractClientConnection connection, - final LocalHistoryIdentifier identifier) { - final Optional dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree); - return dataTree.isPresent() ? new Local(connection, identifier, dataTree.get()) - : new Remote(connection, identifier); + static ProxyHistory createClient(final AbstractClientHistory parent, + final AbstractClientConnection connection, final LocalHistoryIdentifier identifier) { + final Optional dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree); + return dataTree.isPresent() ? new Local(parent, connection, identifier, dataTree.get()) + : new Remote(parent, connection, identifier); } - static ProxyHistory createSingle(final AbstractClientConnection connection, + static ProxyHistory createSingle(final AbstractClientHistory parent, + final AbstractClientConnection connection, final LocalHistoryIdentifier identifier) { - final Optional dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree); - return dataTree.isPresent() ? new LocalSingle(connection, identifier, dataTree.get()) - : new RemoteSingle(connection, identifier); + final Optional dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree); + return dataTree.isPresent() ? new LocalSingle(parent, connection, identifier, dataTree.get()) + : new RemoteSingle(parent, connection, identifier); } @Override + // Non-final for mocking public LocalHistoryIdentifier getIdentifier() { return identifier; } + final ClientActorContext context() { + return connection.context(); + } + + final long currentTime() { + return connection.currentTime(); + } + final ActorRef localActor() { return connection.localActor(); } - final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId) { + final AbstractClientHistory parent() { + return parent; + } + + final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId, + final boolean snapshotOnly) { + return createTransactionProxy(txId, snapshotOnly, false); + } + + // Non-final for mocking + AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId, final boolean snapshotOnly, + final boolean isDone) { lock.lock(); try { if (successor != null) { - return successor.createTransactionProxy(txId); + return successor.createTransactionProxy(txId, snapshotOnly, isDone); } final TransactionIdentifier proxyId = new TransactionIdentifier(identifier, txId.getTransactionId()); - final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId); + final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId, snapshotOnly, isDone); proxies.put(proxyId, ret); LOG.debug("Allocated proxy {} for transaction {}", proxyId, txId); return ret; @@ -305,11 +431,91 @@ abstract class ProxyHistory implements Identifiable { } } + final void skipTransaction(final TransactionIdentifier txId) { + lock.lock(); + try { + if (successor != null) { + successor.skipTransaction(txId); + return; + } + + var local = skippedTransactions; + if (local == null) { + skippedTransactions = local = new ArrayList<>(); + } + local.add(txId); + LOG.debug("Recorded skipped transaction {}", txId); + skipIfNeeded(local); + } finally { + lock.unlock(); + } + } + + @Holding("lock") + private void skipIfNeeded(final List current) { + if (current.size() >= PURGE_SKIPPED_TXID_THRESHOLD) { + skippedTransactions = null; + doSkipTransactions(current); + } + } + + private void skipTransactions(final List toSkip) { + lock.lock(); + try { + if (successor != null) { + successor.skipTransactions(toSkip); + return; + } + + var local = skippedTransactions; + if (local != null) { + local.addAll(toSkip); + } else { + skippedTransactions = local = toSkip; + } + skipIfNeeded(local); + } finally { + lock.unlock(); + } + } + + private void skipTransactions() { + var local = skippedTransactions; + if (local != null) { + lock.lock(); + try { + local = skippedTransactions; + if (local != null && successor == null) { + skippedTransactions = null; + doSkipTransactions(local); + } + } finally { + lock.unlock(); + } + } + } + + @Holding("lock") + private void doSkipTransactions(final List toSkip) { + final var txIds = toSkip.stream() + .mapToLong(TransactionIdentifier::getTransactionId) + .distinct() + .sorted() + .mapToObj(UnsignedLong::fromLongBits) + .collect(ImmutableList.toImmutableList()); + + LOG.debug("Proxy {} skipping transactions {}", this, txIds); + connection.enqueueRequest(new SkipTransactionsRequest(new TransactionIdentifier(identifier, + txIds.get(0).longValue()), 0, localActor(),txIds.subList(1, txIds.size())), resp -> { + LOG.debug("Proxy {} confirmed transaction skip", this); + }, connection.currentTime()); + } + final void abortTransaction(final AbstractProxyTransaction tx) { lock.lock(); try { - proxies.remove(tx.getIdentifier()); - LOG.debug("Proxy {} aborting transaction {}", this, tx); + // Removal will be completed once purge completes + LOG.debug("Proxy {} aborted transaction {}", this, tx); onTransactionAborted(tx); } finally { lock.unlock(); @@ -319,7 +525,7 @@ abstract class ProxyHistory implements Identifiable { final void completeTransaction(final AbstractProxyTransaction tx) { lock.lock(); try { - proxies.remove(tx.getIdentifier()); + // Removal will be completed once purge completes LOG.debug("Proxy {} completing transaction {}", this, tx); onTransactionCompleted(tx); } finally { @@ -327,18 +533,54 @@ abstract class ProxyHistory implements Identifiable { } } + final void purgeTransaction(final AbstractProxyTransaction tx) { + lock.lock(); + try { + proxies.remove(tx.getIdentifier()); + LOG.debug("Proxy {} purged transaction {}", this, tx); + } finally { + lock.unlock(); + } + } + + final void close() { + lock.lock(); + try { + if (successor != null) { + successor.close(); + return; + } + + LOG.debug("Proxy {} invoking destroy", this); + connection.sendRequest(new DestroyLocalHistoryRequest(getIdentifier(), 1, localActor()), + this::onDestroyComplete); + } finally { + lock.unlock(); + } + } + + final void enqueueRequest(final TransactionRequest request, final Consumer> callback, + final long enqueuedTicks) { + skipTransactions(); + connection.enqueueRequest(request, callback, enqueuedTicks); + } + final void sendRequest(final TransactionRequest request, final Consumer> callback) { + skipTransactions(); connection.sendRequest(request, callback); } - @GuardedBy("lock") + @Holding("lock") + @SuppressWarnings("checkstyle:hiddenField") abstract AbstractProxyTransaction doCreateTransactionProxy(AbstractClientConnection connection, - TransactionIdentifier txId); + TransactionIdentifier txId, boolean snapshotOnly, boolean isDone); + @Holding("lock") + @SuppressWarnings("checkstyle:hiddenField") abstract ProxyHistory createSuccessor(AbstractClientConnection connection); @SuppressFBWarnings(value = "UL_UNRELEASED_LOCK", justification = "Lock is released asynchronously via the cohort") - ProxyReconnectCohort startReconnect(final ConnectedClientConnection newConnection) { + final ProxyReconnectCohort startReconnect(final ConnectedClientConnection newConnection) { lock.lock(); if (successor != null) { lock.unlock(); @@ -347,19 +589,42 @@ abstract class ProxyHistory implements Identifiable { successor = createSuccessor(newConnection); LOG.debug("History {} instantiated successor {}", this, successor); + + for (AbstractProxyTransaction t : proxies.values()) { + t.startReconnect(); + } + return new ReconnectCohort(); } - @GuardedBy("lock") + private void onDestroyComplete(final Response response) { + LOG.debug("Proxy {} destroy completed with {}", this, response); + + lock.lock(); + try { + parent.onProxyDestroyed(this); + connection.sendRequest(new PurgeLocalHistoryRequest(getIdentifier(), 2, localActor()), + this::onPurgeComplete); + } finally { + lock.unlock(); + } + } + + private void onPurgeComplete(final Response response) { + LOG.debug("Proxy {} purge completed with {}", this, response); + } + + @Holding("lock") void onTransactionAborted(final AbstractProxyTransaction tx) { // No-op for most implementations } - @GuardedBy("lock") + @Holding("lock") void onTransactionCompleted(final AbstractProxyTransaction tx) { // No-op for most implementations } + @Holding("lock") void onTransactionSealed(final AbstractProxyTransaction tx) { // No-op on most implementations }