Track skipped transactions
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / ProxyHistory.java
index fb8764041e66bb88ee476ed20183eca2152ee549..70b5960a056d943ef8f927094e3b433fb85f3021 100644 (file)
@@ -13,10 +13,14 @@ import static com.google.common.base.Verify.verifyNotNull;
 import static java.util.Objects.requireNonNull;
 
 import akka.actor.ActorRef;
+import com.google.common.collect.ImmutableList;
+import com.google.common.primitives.UnsignedLong;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@@ -34,6 +38,7 @@ import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryReq
 import org.opendaylight.controller.cluster.access.commands.DestroyLocalHistoryRequest;
 import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
 import org.opendaylight.controller.cluster.access.commands.PurgeLocalHistoryRequest;
+import org.opendaylight.controller.cluster.access.commands.SkipTransactionsRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.Request;
@@ -244,6 +249,14 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
                 t.replayMessages(successor, previousEntries);
             }
 
+            // Forward any skipped transactions
+            final var local = skippedTransactions;
+            if (local != null) {
+                LOG.debug("{} forwarding skipped transactions towards successor {}", identifier, successor);
+                successor.skipTransactions(local);
+                skippedTransactions = null;
+            }
+
             // Now look for any finalizing messages
             it = previousEntries.iterator();
             while (it.hasNext()) {
@@ -330,6 +343,26 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
     @GuardedBy("lock")
     private ProxyHistory successor;
 
+    // List of transaction identifiers which were allocated by our parent history, but did not touch our shard. Each of
+    // these represents a hole in otherwise-contiguous allocation of transactionIds. These holes are problematic, as
+    // each of them prevents LeaderFrontendState.purgedHistories from coalescing, leading to a gradual heap exhaustion.
+    //
+    // <p>
+    // We keep these in an ArrayList for fast insertion, as that happens when we are otherwise idle. We translate these
+    // into purge requests when:
+    // - we are about to allocate a new transaction
+    // - we get a successor proxy
+    // - the list grows unreasonably long
+    //
+    // TODO: we are tracking entire TransactionIdentifiers, but really only need to track the longs. Do that once we
+    //       have a {@code List<long>}.
+    // FIXME: this is not tuneable, but perhaps should be
+    // FIXME: default value deserves some explanation -- this affects depth of an RB Tree on the receiving end.
+    private static final int PURGE_SKIPPED_TXID_THRESHOLD = 256;
+
+    @GuardedBy("lock")
+    private volatile List<TransactionIdentifier> skippedTransactions;
+
     private ProxyHistory(final AbstractClientHistory parent,
             final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
         this.parent = requireNonNull(parent);
@@ -398,6 +431,86 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
         }
     }
 
+    final void skipTransaction(final TransactionIdentifier txId) {
+        lock.lock();
+        try {
+            if (successor != null) {
+                successor.skipTransaction(txId);
+                return;
+            }
+
+            var local = skippedTransactions;
+            if (local == null) {
+                skippedTransactions = local = new ArrayList<>();
+            }
+            local.add(txId);
+            LOG.debug("Recorded skipped transaction {}", txId);
+            skipIfNeeded(local);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Holding("lock")
+    private void skipIfNeeded(final List<TransactionIdentifier> current) {
+        if (current.size() >= PURGE_SKIPPED_TXID_THRESHOLD) {
+            skippedTransactions = null;
+            doSkipTransactions(current);
+        }
+    }
+
+    private void skipTransactions(final List<TransactionIdentifier> toSkip) {
+        lock.lock();
+        try {
+            if (successor != null) {
+                successor.skipTransactions(toSkip);
+                return;
+            }
+
+            var local = skippedTransactions;
+            if (local != null) {
+                local.addAll(toSkip);
+            } else {
+                skippedTransactions = local = toSkip;
+            }
+            skipIfNeeded(local);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    private void skipTransactions() {
+        var local = skippedTransactions;
+        if (local != null) {
+            lock.lock();
+            try {
+                local = skippedTransactions;
+                if (local != null && successor == null) {
+                    skippedTransactions = null;
+                    doSkipTransactions(local);
+                }
+            } finally {
+                lock.unlock();
+            }
+        }
+    }
+
+    @Holding("lock")
+    private void doSkipTransactions(final List<TransactionIdentifier> toSkip) {
+        final var txIds = toSkip.stream()
+            .mapToLong(TransactionIdentifier::getTransactionId)
+            .distinct()
+            .sorted()
+            .mapToObj(UnsignedLong::fromLongBits)
+            .collect(ImmutableList.toImmutableList());
+
+        LOG.debug("Proxy {} skipping transactions {}", this, txIds);
+        connection.enqueueRequest(new SkipTransactionsRequest(new TransactionIdentifier(identifier,
+            txIds.get(0).longValue()), 0, localActor(),txIds.subList(1, txIds.size())), resp -> {
+                LOG.debug("Proxy {} confirmed transaction skip", this);
+            }, connection.currentTime());
+    }
+
     final void abortTransaction(final AbstractProxyTransaction tx) {
         lock.lock();
         try {
@@ -448,10 +561,12 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
 
     final void enqueueRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
             final long enqueuedTicks) {
+        skipTransactions();
         connection.enqueueRequest(request, callback, enqueuedTicks);
     }
 
     final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
+        skipTransactions();
         connection.sendRequest(request, callback);
     }