import static java.util.Objects.requireNonNull;
import akka.actor.ActorRef;
+import com.google.common.collect.ImmutableList;
+import com.google.common.primitives.UnsignedLong;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.opendaylight.controller.cluster.access.commands.DestroyLocalHistoryRequest;
import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
import org.opendaylight.controller.cluster.access.commands.PurgeLocalHistoryRequest;
+import org.opendaylight.controller.cluster.access.commands.SkipTransactionsRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
import org.opendaylight.controller.cluster.access.concepts.Request;
t.replayMessages(successor, previousEntries);
}
+ // Forward any skipped transactions
+ final var local = skippedTransactions;
+ if (local != null) {
+ LOG.debug("{} forwarding skipped transactions towards successor {}", identifier, successor);
+ successor.skipTransactions(local);
+ skippedTransactions = null;
+ }
+
// Now look for any finalizing messages
it = previousEntries.iterator();
while (it.hasNext()) {
@GuardedBy("lock")
private ProxyHistory successor;
+ // List of transaction identifiers which were allocated by our parent history, but did not touch our shard. Each of
+ // these represents a hole in otherwise-contiguous allocation of transactionIds. These holes are problematic, as
+ // each of them prevents LeaderFrontendState.purgedHistories from coalescing, leading to a gradual heap exhaustion.
+ //
+ // <p>
+ // We keep these in an ArrayList for fast insertion, as that happens when we are otherwise idle. We translate these
+ // into purge requests when:
+ // - we are about to allocate a new transaction
+ // - we get a successor proxy
+ // - the list grows unreasonably long
+ //
+ // TODO: we are tracking entire TransactionIdentifiers, but really only need to track the longs. Do that once we
+ // have a {@code List<long>}.
+ // FIXME: this is not tuneable, but perhaps should be
+ // FIXME: default value deserves some explanation -- this affects depth of an RB Tree on the receiving end.
+ private static final int PURGE_SKIPPED_TXID_THRESHOLD = 256;
+
+ @GuardedBy("lock")
+ private volatile List<TransactionIdentifier> skippedTransactions;
+
private ProxyHistory(final AbstractClientHistory parent,
final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
this.parent = requireNonNull(parent);
}
}
+ final void skipTransaction(final TransactionIdentifier txId) {
+ lock.lock();
+ try {
+ if (successor != null) {
+ successor.skipTransaction(txId);
+ return;
+ }
+
+ var local = skippedTransactions;
+ if (local == null) {
+ skippedTransactions = local = new ArrayList<>();
+ }
+ local.add(txId);
+ LOG.debug("Recorded skipped transaction {}", txId);
+ skipIfNeeded(local);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Holding("lock")
+ private void skipIfNeeded(final List<TransactionIdentifier> current) {
+ if (current.size() >= PURGE_SKIPPED_TXID_THRESHOLD) {
+ skippedTransactions = null;
+ doSkipTransactions(current);
+ }
+ }
+
+ private void skipTransactions(final List<TransactionIdentifier> toSkip) {
+ lock.lock();
+ try {
+ if (successor != null) {
+ successor.skipTransactions(toSkip);
+ return;
+ }
+
+ var local = skippedTransactions;
+ if (local != null) {
+ local.addAll(toSkip);
+ } else {
+ skippedTransactions = local = toSkip;
+ }
+ skipIfNeeded(local);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private void skipTransactions() {
+ var local = skippedTransactions;
+ if (local != null) {
+ lock.lock();
+ try {
+ local = skippedTransactions;
+ if (local != null && successor == null) {
+ skippedTransactions = null;
+ doSkipTransactions(local);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+
+ @Holding("lock")
+ private void doSkipTransactions(final List<TransactionIdentifier> toSkip) {
+ final var txIds = toSkip.stream()
+ .mapToLong(TransactionIdentifier::getTransactionId)
+ .distinct()
+ .sorted()
+ .mapToObj(UnsignedLong::fromLongBits)
+ .collect(ImmutableList.toImmutableList());
+
+ LOG.debug("Proxy {} skipping transactions {}", this, txIds);
+ connection.enqueueRequest(new SkipTransactionsRequest(new TransactionIdentifier(identifier,
+ txIds.get(0).longValue()), 0, localActor(),txIds.subList(1, txIds.size())), resp -> {
+ LOG.debug("Proxy {} confirmed transaction skip", this);
+ }, connection.currentTime());
+ }
+
final void abortTransaction(final AbstractProxyTransaction tx) {
lock.lock();
try {
final void enqueueRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
final long enqueuedTicks) {
+ skipTransactions();
connection.enqueueRequest(request, callback, enqueuedTicks);
}
final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
+ skipTransactions();
connection.sendRequest(request, callback);
}