X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FProxyHistory.java;h=70b5960a056d943ef8f927094e3b433fb85f3021;hb=refs%2Fchanges%2F49%2F85749%2F63;hp=340cc5581fbbe5ac051b033d8df86677c5aaa3f0;hpb=f662ce8b1fa94b77ba66f7ece8bcaff91dee809e;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java index 340cc5581f..70b5960a05 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java @@ -8,14 +8,19 @@ package org.opendaylight.controller.cluster.databroker.actors.dds; import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Verify.verify; +import static com.google.common.base.Verify.verifyNotNull; import static java.util.Objects.requireNonNull; import akka.actor.ActorRef; -import com.google.common.base.Verify; +import com.google.common.collect.ImmutableList; +import com.google.common.primitives.UnsignedLong; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; @@ -24,6 +29,7 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; import org.checkerframework.checker.lock.qual.GuardedBy; import org.checkerframework.checker.lock.qual.Holding; +import org.eclipse.jdt.annotation.NonNull; import org.opendaylight.controller.cluster.access.client.AbstractClientConnection; import org.opendaylight.controller.cluster.access.client.ClientActorContext; import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection; @@ -32,6 +38,7 @@ import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryReq import org.opendaylight.controller.cluster.access.commands.DestroyLocalHistoryRequest; import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest; import org.opendaylight.controller.cluster.access.commands.PurgeLocalHistoryRequest; +import org.opendaylight.controller.cluster.access.commands.SkipTransactionsRequest; import org.opendaylight.controller.cluster.access.commands.TransactionRequest; import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; import org.opendaylight.controller.cluster.access.concepts.Request; @@ -129,7 +136,7 @@ abstract class ProxyHistory implements Identifiable { @Override void onTransactionCompleted(final AbstractProxyTransaction tx) { - Verify.verify(tx instanceof LocalProxyTransaction); + verify(tx instanceof LocalProxyTransaction, "Unexpected transaction %s", tx); if (tx instanceof LocalReadWriteProxyTransaction && LAST_SEALED_UPDATER.compareAndSet(this, (LocalReadWriteProxyTransaction) tx, null)) { LOG.debug("Completed last sealed transaction {}", tx); @@ -228,7 +235,7 @@ abstract class ProxyHistory implements Identifiable { final ConnectionEntry e = it.next(); final Request req = e.getRequest(); if (identifier.equals(req.getTarget())) { - Verify.verify(req instanceof LocalHistoryRequest); + verify(req instanceof LocalHistoryRequest, "Unexpected request %s", req); if (req instanceof CreateLocalHistoryRequest) { successor.connection.enqueueRequest(req, e.getCallback(), e.getEnqueuedTicks()); it.remove(); @@ -242,13 +249,21 @@ abstract class ProxyHistory implements Identifiable { t.replayMessages(successor, previousEntries); } + // Forward any skipped transactions + final var local = skippedTransactions; + if (local != null) { + LOG.debug("{} forwarding skipped transactions towards successor {}", identifier, successor); + successor.skipTransactions(local); + skippedTransactions = null; + } + // Now look for any finalizing messages it = previousEntries.iterator(); while (it.hasNext()) { final ConnectionEntry e = it.next(); final Request req = e.getRequest(); if (identifier.equals(req.getTarget())) { - Verify.verify(req instanceof LocalHistoryRequest); + verify(req instanceof LocalHistoryRequest, "Unexpected request %s", req); if (req instanceof DestroyLocalHistoryRequest) { successor.connection.enqueueRequest(req, e.getCallback(), e.getEnqueuedTicks()); it.remove(); @@ -258,10 +273,10 @@ abstract class ProxyHistory implements Identifiable { } } - @GuardedBy("lock") + @Holding("lock") @Override ProxyHistory finishReconnect() { - final ProxyHistory ret = Verify.verifyNotNull(successor); + final ProxyHistory ret = verifyNotNull(successor); for (AbstractProxyTransaction t : proxies.values()) { t.finishReconnect(); @@ -319,15 +334,35 @@ abstract class ProxyHistory implements Identifiable { private static final Logger LOG = LoggerFactory.getLogger(ProxyHistory.class); private final Lock lock = new ReentrantLock(); - private final LocalHistoryIdentifier identifier; - private final AbstractClientConnection connection; - private final AbstractClientHistory parent; + private final @NonNull LocalHistoryIdentifier identifier; + private final @NonNull AbstractClientConnection connection; + private final @NonNull AbstractClientHistory parent; @GuardedBy("lock") private final Map proxies = new LinkedHashMap<>(); @GuardedBy("lock") private ProxyHistory successor; + // List of transaction identifiers which were allocated by our parent history, but did not touch our shard. Each of + // these represents a hole in otherwise-contiguous allocation of transactionIds. These holes are problematic, as + // each of them prevents LeaderFrontendState.purgedHistories from coalescing, leading to a gradual heap exhaustion. + // + //

+ // We keep these in an ArrayList for fast insertion, as that happens when we are otherwise idle. We translate these + // into purge requests when: + // - we are about to allocate a new transaction + // - we get a successor proxy + // - the list grows unreasonably long + // + // TODO: we are tracking entire TransactionIdentifiers, but really only need to track the longs. Do that once we + // have a {@code List}. + // FIXME: this is not tuneable, but perhaps should be + // FIXME: default value deserves some explanation -- this affects depth of an RB Tree on the receiving end. + private static final int PURGE_SKIPPED_TXID_THRESHOLD = 256; + + @GuardedBy("lock") + private volatile List skippedTransactions; + private ProxyHistory(final AbstractClientHistory parent, final AbstractClientConnection connection, final LocalHistoryIdentifier identifier) { this.parent = requireNonNull(parent); @@ -351,6 +386,7 @@ abstract class ProxyHistory implements Identifiable { } @Override + // Non-final for mocking public LocalHistoryIdentifier getIdentifier() { return identifier; } @@ -376,6 +412,7 @@ abstract class ProxyHistory implements Identifiable { return createTransactionProxy(txId, snapshotOnly, false); } + // Non-final for mocking AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) { lock.lock(); @@ -394,6 +431,86 @@ abstract class ProxyHistory implements Identifiable { } } + final void skipTransaction(final TransactionIdentifier txId) { + lock.lock(); + try { + if (successor != null) { + successor.skipTransaction(txId); + return; + } + + var local = skippedTransactions; + if (local == null) { + skippedTransactions = local = new ArrayList<>(); + } + local.add(txId); + LOG.debug("Recorded skipped transaction {}", txId); + skipIfNeeded(local); + } finally { + lock.unlock(); + } + } + + @Holding("lock") + private void skipIfNeeded(final List current) { + if (current.size() >= PURGE_SKIPPED_TXID_THRESHOLD) { + skippedTransactions = null; + doSkipTransactions(current); + } + } + + private void skipTransactions(final List toSkip) { + lock.lock(); + try { + if (successor != null) { + successor.skipTransactions(toSkip); + return; + } + + var local = skippedTransactions; + if (local != null) { + local.addAll(toSkip); + } else { + skippedTransactions = local = toSkip; + } + skipIfNeeded(local); + } finally { + lock.unlock(); + } + } + + private void skipTransactions() { + var local = skippedTransactions; + if (local != null) { + lock.lock(); + try { + local = skippedTransactions; + if (local != null && successor == null) { + skippedTransactions = null; + doSkipTransactions(local); + } + } finally { + lock.unlock(); + } + } + } + + @Holding("lock") + private void doSkipTransactions(final List toSkip) { + final var txIds = toSkip.stream() + .mapToLong(TransactionIdentifier::getTransactionId) + .distinct() + .sorted() + .mapToObj(UnsignedLong::fromLongBits) + .collect(ImmutableList.toImmutableList()); + + LOG.debug("Proxy {} skipping transactions {}", this, txIds); + connection.enqueueRequest(new SkipTransactionsRequest(new TransactionIdentifier(identifier, + txIds.get(0).longValue()), 0, localActor(),txIds.subList(1, txIds.size())), resp -> { + LOG.debug("Proxy {} confirmed transaction skip", this); + }, connection.currentTime()); + } + final void abortTransaction(final AbstractProxyTransaction tx) { lock.lock(); try { @@ -416,7 +533,7 @@ abstract class ProxyHistory implements Identifiable { } } - void purgeTransaction(final AbstractProxyTransaction tx) { + final void purgeTransaction(final AbstractProxyTransaction tx) { lock.lock(); try { proxies.remove(tx.getIdentifier()); @@ -444,23 +561,26 @@ abstract class ProxyHistory implements Identifiable { final void enqueueRequest(final TransactionRequest request, final Consumer> callback, final long enqueuedTicks) { + skipTransactions(); connection.enqueueRequest(request, callback, enqueuedTicks); } final void sendRequest(final TransactionRequest request, final Consumer> callback) { + skipTransactions(); connection.sendRequest(request, callback); } - @GuardedBy("lock") + @Holding("lock") @SuppressWarnings("checkstyle:hiddenField") abstract AbstractProxyTransaction doCreateTransactionProxy(AbstractClientConnection connection, TransactionIdentifier txId, boolean snapshotOnly, boolean isDone); + @Holding("lock") @SuppressWarnings("checkstyle:hiddenField") abstract ProxyHistory createSuccessor(AbstractClientConnection connection); @SuppressFBWarnings(value = "UL_UNRELEASED_LOCK", justification = "Lock is released asynchronously via the cohort") - ProxyReconnectCohort startReconnect(final ConnectedClientConnection newConnection) { + final ProxyReconnectCohort startReconnect(final ConnectedClientConnection newConnection) { lock.lock(); if (successor != null) { lock.unlock(); @@ -504,6 +624,7 @@ abstract class ProxyHistory implements Identifiable { // No-op for most implementations } + @Holding("lock") void onTransactionSealed(final AbstractProxyTransaction tx) { // No-op on most implementations }