Track skipped transactions 49/85749/63
authorRobert Varga <robert.varga@pantheon.tech>
Tue, 12 Nov 2019 22:50:34 +0000 (23:50 +0100)
committerRobert Varga <nite@hq.sk>
Fri, 12 Nov 2021 06:35:26 +0000 (06:35 +0000)
We are allocating transaction identifiers which are shared across
shards. As a transaction can touch only some shards, non-participating
shards will never see the transaction ID, leading to holes in their
UnsignedLongSets -- and those holes gradually eat up more and more
memory.

Track when we have such a hole and lazily forward a new request, which
purges one or more such identifiers -- plugging the memory leak.

The crux of the machinery is exchanging state between ProxyHistory
and FrontendHistoryMetadataBuilder.

JIRA: CONTROLLER-1991
Change-Id: I3817fa2841e5f9c405bb20ff1a104537ad459ce3
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
21 files changed:
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/SkipTransactionsRequest.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/SkipTransactionsRequestV1.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/SkipTransactionsResponse.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/SkipTransactionsResponseProxyV1.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-api/src/test/java/org/opendaylight/controller/cluster/access/commands/SkipTransactionsRequestTest.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-api/src/test/java/org/opendaylight/controller/cluster/access/commands/SkipTransactionsResponseTest.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHandle.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendClientMetadataBuilder.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendHistoryMetadataBuilder.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendMetadata.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeMetadata.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/SkipTransactionsPayload.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHandleTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/SkipTransactionsPayloadTest.java [new file with mode: 0644]

diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/SkipTransactionsRequest.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/SkipTransactionsRequest.java
new file mode 100644 (file)
index 0000000..dd5faa8
--- /dev/null
@@ -0,0 +1,71 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import akka.actor.ActorRef;
+import com.google.common.annotations.Beta;
+import com.google.common.base.MoreObjects.ToStringHelper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.primitives.UnsignedLong;
+import java.util.Collection;
+import java.util.List;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+/**
+ * Request to skip a number of {@link TransactionIdentifier}s within a {code local history}. This request is essentially
+ * equivalent to {@link TransactionPurgeRequest} for {@link #getTarget()}, but also carries additional sibling
+ * {@link TransactionIdentifier}s in {@link #getOthers()}.
+ *
+ * <p>
+ * This request is sent by the frontend to inform the backend that a set of {@link TransactionIdentifier}s are
+ * explicitly retired and are guaranteed to never be used by the frontend.
+ */
+@Beta
+public final class SkipTransactionsRequest extends TransactionRequest<SkipTransactionsRequest> {
+    private static final long serialVersionUID = 1L;
+
+    // Note: UnsignedLong is arbitrary, yang.common.Uint64 would work just as well, we really want an immutable
+    //       List<long>, though.
+    private final @NonNull ImmutableList<UnsignedLong> others;
+
+    public SkipTransactionsRequest(final TransactionIdentifier target, final long sequence,
+            final ActorRef replyTo, final Collection<UnsignedLong> others) {
+        super(target, sequence, replyTo);
+        this.others = ImmutableList.copyOf(others);
+    }
+
+    /**
+     * Return this {@link #getTarget()}s sibling {@link TransactionIdentifier}s.
+     *
+     * @return Siblings values of {@link TransactionIdentifier#getTransactionId()}
+     */
+    public List<UnsignedLong> getOthers() {
+        return others;
+    }
+
+    @Override
+    protected SkipTransactionsRequestV1 externalizableProxy(final ABIVersion version) {
+        return new SkipTransactionsRequestV1(this);
+    }
+
+    @Override
+    protected SkipTransactionsRequest cloneAsVersion(final ABIVersion version) {
+        return this;
+    }
+
+    @Override
+    protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
+        final var helper = super.addToStringAttributes(toStringHelper);
+        if (!others.isEmpty()) {
+            helper.add("others", others);
+        }
+        return helper;
+    }
+}
diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/SkipTransactionsRequestV1.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/SkipTransactionsRequestV1.java
new file mode 100644 (file)
index 0000000..c7c383c
--- /dev/null
@@ -0,0 +1,84 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import akka.actor.ActorRef;
+import com.google.common.collect.ImmutableList;
+import com.google.common.primitives.UnsignedLong;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.List;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.yangtools.concepts.WritableObjects;
+
+/**
+ * Externalizable proxy for use with {@link SkipTransactionsRequest}. It implements the initial
+ * (Phosphorus SR1) serialization format.
+ */
+final class SkipTransactionsRequestV1 extends AbstractTransactionRequestProxy<SkipTransactionsRequest> {
+    private List<UnsignedLong> others;
+
+    // checkstyle flags the public modifier as redundant however it is explicitly needed for Java serialization to
+    // be able to create instances via reflection.
+    @SuppressWarnings("checkstyle:RedundantModifier")
+    public SkipTransactionsRequestV1() {
+        // For Externalizable
+    }
+
+    SkipTransactionsRequestV1(final SkipTransactionsRequest request) {
+        super(request);
+        others = request.getOthers();
+    }
+
+    @Override
+    public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
+        super.readExternal(in);
+
+        final int size = in.readInt();
+        final var builder = ImmutableList.<UnsignedLong>builderWithExpectedSize(size);
+        int idx;
+        if (size % 2 != 0) {
+            builder.add(UnsignedLong.fromLongBits(WritableObjects.readLong(in)));
+            idx = 1;
+        } else {
+            idx = 0;
+        }
+        for (; idx < size; idx += 2) {
+            final byte hdr = WritableObjects.readLongHeader(in);
+            builder.add(UnsignedLong.fromLongBits(WritableObjects.readFirstLong(in, hdr)));
+            builder.add(UnsignedLong.fromLongBits(WritableObjects.readSecondLong(in, hdr)));
+        }
+        others = builder.build();
+    }
+
+    @Override
+    public void writeExternal(final ObjectOutput out) throws IOException {
+        super.writeExternal(out);
+
+        final int size = others.size();
+        out.writeInt(size);
+
+        int idx;
+        if (size % 2 != 0) {
+            WritableObjects.writeLong(out, others.get(0).longValue());
+            idx = 1;
+        } else {
+            idx = 0;
+        }
+        for (; idx < size; idx += 2) {
+            WritableObjects.writeLongs(out, others.get(idx).longValue(), others.get(idx + 1).longValue());
+        }
+    }
+
+    @Override
+    protected SkipTransactionsRequest createRequest(final TransactionIdentifier target, final long sequence,
+            final ActorRef replyToActor) {
+        return new SkipTransactionsRequest(target, sequence, replyToActor, others);
+    }
+}
diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/SkipTransactionsResponse.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/SkipTransactionsResponse.java
new file mode 100644 (file)
index 0000000..b62af7e
--- /dev/null
@@ -0,0 +1,35 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import com.google.common.annotations.Beta;
+import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+/**
+ * Successful reply to a {@link SkipTransactionsRequest}.
+ */
+@Beta
+public final class SkipTransactionsResponse extends TransactionSuccess<SkipTransactionsResponse> {
+    private static final long serialVersionUID = 1L;
+
+    public SkipTransactionsResponse(final TransactionIdentifier identifier, final long sequence) {
+        super(identifier, sequence);
+    }
+
+    @Override
+    protected AbstractTransactionSuccessProxy<SkipTransactionsResponse> externalizableProxy(
+            final ABIVersion version) {
+        return new SkipTransactionsResponseProxyV1(this);
+    }
+
+    @Override
+    protected SkipTransactionsResponse cloneAsVersion(final ABIVersion version) {
+        return this;
+    }
+}
diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/SkipTransactionsResponseProxyV1.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/SkipTransactionsResponseProxyV1.java
new file mode 100644 (file)
index 0000000..9bc93f9
--- /dev/null
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+/**
+ * Externalizable proxy for use with {@link SkipTransactionsResponse}. It implements the initial (Phosphorus SR1)
+ * serialization format.
+ */
+final class SkipTransactionsResponseProxyV1 extends AbstractTransactionSuccessProxy<SkipTransactionsResponse> {
+    private static final long serialVersionUID = 1L;
+
+    // checkstyle flags the public modifier as redundant however it is explicitly needed for Java serialization to
+    // be able to create instances via reflection.
+    @SuppressWarnings("checkstyle:RedundantModifier")
+    public SkipTransactionsResponseProxyV1() {
+        // For Externalizable
+    }
+
+    SkipTransactionsResponseProxyV1(final SkipTransactionsResponse success) {
+        super(success);
+    }
+
+    @Override
+    protected SkipTransactionsResponse createSuccess(final TransactionIdentifier target, final long sequence) {
+        return new SkipTransactionsResponse(target, sequence);
+    }
+}
diff --git a/opendaylight/md-sal/cds-access-api/src/test/java/org/opendaylight/controller/cluster/access/commands/SkipTransactionsRequestTest.java b/opendaylight/md-sal/cds-access-api/src/test/java/org/opendaylight/controller/cluster/access/commands/SkipTransactionsRequestTest.java
new file mode 100644 (file)
index 0000000..39076e3
--- /dev/null
@@ -0,0 +1,39 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.primitives.UnsignedLong;
+import java.util.List;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.access.ABIVersion;
+
+public class SkipTransactionsRequestTest extends AbstractTransactionRequestTest<SkipTransactionsRequest> {
+    private static final SkipTransactionsRequest OBJECT = new SkipTransactionsRequest(
+            TRANSACTION_IDENTIFIER, 0, ACTOR_REF, List.of(UnsignedLong.ONE));
+
+    @Override
+    protected SkipTransactionsRequest object() {
+        return OBJECT;
+    }
+
+    @Test
+    public void cloneAsVersionTest() {
+        final SkipTransactionsRequest clone = OBJECT.cloneAsVersion(ABIVersion.BORON);
+        assertEquals(OBJECT, clone);
+    }
+
+    @Override
+    protected void doAdditionalAssertions(final Object deserialize) {
+        assertThat(deserialize, instanceOf(SkipTransactionsRequest.class));
+        assertEquals(OBJECT.getReplyTo(), ((SkipTransactionsRequest) deserialize).getReplyTo());
+    }
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/cds-access-api/src/test/java/org/opendaylight/controller/cluster/access/commands/SkipTransactionsResponseTest.java b/opendaylight/md-sal/cds-access-api/src/test/java/org/opendaylight/controller/cluster/access/commands/SkipTransactionsResponseTest.java
new file mode 100644 (file)
index 0000000..be70ad9
--- /dev/null
@@ -0,0 +1,36 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+import org.opendaylight.controller.cluster.access.ABIVersion;
+
+public class SkipTransactionsResponseTest extends AbstractTransactionSuccessTest<SkipTransactionsResponse> {
+    private static final SkipTransactionsResponse OBJECT = new SkipTransactionsResponse(
+            TRANSACTION_IDENTIFIER, 0);
+
+    @Override
+    protected SkipTransactionsResponse object() {
+        return OBJECT;
+    }
+
+    @Test
+    public void cloneAsVersionTest() {
+        final SkipTransactionsResponse clone = OBJECT.cloneAsVersion(ABIVersion.BORON);
+        assertEquals(OBJECT, clone);
+    }
+
+    @Override
+    protected void doAdditionalAssertions(final Object deserialize) {
+        assertThat(deserialize, instanceOf(SkipTransactionsResponse.class));
+    }
+}
\ No newline at end of file
index 9c290df..149f38f 100644 (file)
@@ -165,7 +165,7 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
      *
      * <p>
      * Note that unlike {@link #sendRequest(Request, Consumer)}, this method does not exert backpressure, hence it
-     * should never be called from an application thread.
+     * should never be called from an application thread and serves mostly for moving requests between queues.
      *
      * @param request Request to send
      * @param callback Callback to invoke
index aa4779c..003c073 100644 (file)
@@ -12,7 +12,7 @@ import static java.util.Objects.requireNonNull;
 
 import com.google.common.annotations.Beta;
 import com.google.common.base.MoreObjects;
-import java.util.Collection;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import org.eclipse.jdt.annotation.NonNull;
@@ -76,12 +76,13 @@ public abstract class AbstractClientHandle<T extends AbstractProxyTransaction> e
     }
 
     private boolean commonAbort() {
-        final Collection<T> toClose = ensureClosed();
+        final Map<Long, T> toClose = ensureClosed();
         if (toClose == null) {
             return false;
         }
 
-        toClose.forEach(AbstractProxyTransaction::abort);
+        toClose.values().forEach(AbstractProxyTransaction::abort);
+        parent.onTransactionShardsBound(transactionId, toClose.keySet());
         return true;
     }
 
@@ -95,14 +96,14 @@ public abstract class AbstractClientHandle<T extends AbstractProxyTransaction> e
      * Make sure this snapshot is closed. If it became closed as the effect of this call, return a collection of
      * {@link AbstractProxyTransaction} handles which need to be closed, too.
      *
-     * @return null if this snapshot has already been closed, otherwise a collection of proxies, which need to be
+     * @return null if this snapshot has already been closed, otherwise a State with of proxies, which need to be
      *         closed, too.
      */
-    final @Nullable Collection<T> ensureClosed() {
+    final @Nullable Map<Long, T> ensureClosed() {
         // volatile read and a conditional CAS. This ends up being better in the typical case when we are invoked more
         // than once (see ClientBackedTransaction) than performing a STATE_UPDATER.getAndSet().
         final State<T> local = state;
-        return local != null && STATE_UPDATER.compareAndSet(this, local, null) ? local.values() : null;
+        return local != null && STATE_UPDATER.compareAndSet(this, local, null) ? local : null;
     }
 
     final T ensureProxy(final YangInstanceIdentifier path) {
index d306d13..95552b3 100644 (file)
@@ -15,6 +15,7 @@ import static java.util.Objects.requireNonNull;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@@ -239,6 +240,28 @@ public abstract class AbstractClientHistory extends LocalAbortable implements Id
     @Holding("this")
     abstract ClientTransaction doCreateTransaction();
 
+    /**
+     * Callback invoked from {@link AbstractClientHandle}'s lifecycle to inform that a particular transaction is
+     * completing with a set of participating shards.
+     *
+     * @param txId Transaction identifier
+     * @param participatingShards Participating shard cookies
+     */
+    final void onTransactionShardsBound(final TransactionIdentifier txId, final Set<Long> participatingShards) {
+        // Guard against startReconnect() kicking in. It is okay to connect new participants concurrently, as those
+        // will not see the holes caused by this.
+        final long stamp = lock.readLock();
+        try {
+            for (var entry : histories.entrySet()) {
+                if (!participatingShards.contains(entry.getKey())) {
+                    entry.getValue().skipTransaction(txId);
+                }
+            }
+        } finally {
+            lock.unlockRead(stamp);
+        }
+    }
+
     /**
      * Callback invoked from {@link ClientTransaction} when a child transaction readied for submission.
      *
index f5855c2..7cdc04a 100644 (file)
@@ -12,6 +12,7 @@ import static com.google.common.base.Preconditions.checkState;
 import com.google.common.annotations.Beta;
 import com.google.common.util.concurrent.FluentFuture;
 import java.util.Collection;
+import java.util.Map;
 import java.util.Optional;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
@@ -79,24 +80,30 @@ public class ClientTransaction extends AbstractClientHandle<AbstractProxyTransac
     }
 
     public DOMStoreThreePhaseCommitCohort ready() {
-        final Collection<AbstractProxyTransaction> toReady = ensureClosed();
-        checkState(toReady != null, "Attempted to submit a closed transaction %s", this);
+        final Map<Long, AbstractProxyTransaction> participants = ensureClosed();
+        checkState(participants != null, "Attempted to submit a closed transaction %s", this);
 
+        final Collection<AbstractProxyTransaction> toReady = participants.values();
         toReady.forEach(AbstractProxyTransaction::seal);
+
+        final TransactionIdentifier txId = getIdentifier();
+        final AbstractClientHistory parent = parent();
+        parent.onTransactionShardsBound(txId, participants.keySet());
+
         final AbstractTransactionCommitCohort cohort;
         switch (toReady.size()) {
             case 0:
-                cohort = new EmptyTransactionCommitCohort(parent(), getIdentifier());
+                cohort = new EmptyTransactionCommitCohort(parent, txId);
                 break;
             case 1:
-                cohort = new DirectTransactionCommitCohort(parent(), getIdentifier(), toReady.iterator().next());
+                cohort = new DirectTransactionCommitCohort(parent, txId, toReady.iterator().next());
                 break;
             default:
-                cohort = new ClientTransactionCommitCohort(parent(), getIdentifier(), toReady);
+                cohort = new ClientTransactionCommitCohort(parent, txId, toReady);
                 break;
         }
 
-        return parent().onTransactionReady(this, cohort);
+        return parent.onTransactionReady(this, cohort);
     }
 
     @Override
index fb87640..70b5960 100644 (file)
@@ -13,10 +13,14 @@ import static com.google.common.base.Verify.verifyNotNull;
 import static java.util.Objects.requireNonNull;
 
 import akka.actor.ActorRef;
+import com.google.common.collect.ImmutableList;
+import com.google.common.primitives.UnsignedLong;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@@ -34,6 +38,7 @@ import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryReq
 import org.opendaylight.controller.cluster.access.commands.DestroyLocalHistoryRequest;
 import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
 import org.opendaylight.controller.cluster.access.commands.PurgeLocalHistoryRequest;
+import org.opendaylight.controller.cluster.access.commands.SkipTransactionsRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.Request;
@@ -244,6 +249,14 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
                 t.replayMessages(successor, previousEntries);
             }
 
+            // Forward any skipped transactions
+            final var local = skippedTransactions;
+            if (local != null) {
+                LOG.debug("{} forwarding skipped transactions towards successor {}", identifier, successor);
+                successor.skipTransactions(local);
+                skippedTransactions = null;
+            }
+
             // Now look for any finalizing messages
             it = previousEntries.iterator();
             while (it.hasNext()) {
@@ -330,6 +343,26 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
     @GuardedBy("lock")
     private ProxyHistory successor;
 
+    // List of transaction identifiers which were allocated by our parent history, but did not touch our shard. Each of
+    // these represents a hole in otherwise-contiguous allocation of transactionIds. These holes are problematic, as
+    // each of them prevents LeaderFrontendState.purgedHistories from coalescing, leading to a gradual heap exhaustion.
+    //
+    // <p>
+    // We keep these in an ArrayList for fast insertion, as that happens when we are otherwise idle. We translate these
+    // into purge requests when:
+    // - we are about to allocate a new transaction
+    // - we get a successor proxy
+    // - the list grows unreasonably long
+    //
+    // TODO: we are tracking entire TransactionIdentifiers, but really only need to track the longs. Do that once we
+    //       have a {@code List<long>}.
+    // FIXME: this is not tuneable, but perhaps should be
+    // FIXME: default value deserves some explanation -- this affects depth of an RB Tree on the receiving end.
+    private static final int PURGE_SKIPPED_TXID_THRESHOLD = 256;
+
+    @GuardedBy("lock")
+    private volatile List<TransactionIdentifier> skippedTransactions;
+
     private ProxyHistory(final AbstractClientHistory parent,
             final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
         this.parent = requireNonNull(parent);
@@ -398,6 +431,86 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
         }
     }
 
+    final void skipTransaction(final TransactionIdentifier txId) {
+        lock.lock();
+        try {
+            if (successor != null) {
+                successor.skipTransaction(txId);
+                return;
+            }
+
+            var local = skippedTransactions;
+            if (local == null) {
+                skippedTransactions = local = new ArrayList<>();
+            }
+            local.add(txId);
+            LOG.debug("Recorded skipped transaction {}", txId);
+            skipIfNeeded(local);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Holding("lock")
+    private void skipIfNeeded(final List<TransactionIdentifier> current) {
+        if (current.size() >= PURGE_SKIPPED_TXID_THRESHOLD) {
+            skippedTransactions = null;
+            doSkipTransactions(current);
+        }
+    }
+
+    private void skipTransactions(final List<TransactionIdentifier> toSkip) {
+        lock.lock();
+        try {
+            if (successor != null) {
+                successor.skipTransactions(toSkip);
+                return;
+            }
+
+            var local = skippedTransactions;
+            if (local != null) {
+                local.addAll(toSkip);
+            } else {
+                skippedTransactions = local = toSkip;
+            }
+            skipIfNeeded(local);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    private void skipTransactions() {
+        var local = skippedTransactions;
+        if (local != null) {
+            lock.lock();
+            try {
+                local = skippedTransactions;
+                if (local != null && successor == null) {
+                    skippedTransactions = null;
+                    doSkipTransactions(local);
+                }
+            } finally {
+                lock.unlock();
+            }
+        }
+    }
+
+    @Holding("lock")
+    private void doSkipTransactions(final List<TransactionIdentifier> toSkip) {
+        final var txIds = toSkip.stream()
+            .mapToLong(TransactionIdentifier::getTransactionId)
+            .distinct()
+            .sorted()
+            .mapToObj(UnsignedLong::fromLongBits)
+            .collect(ImmutableList.toImmutableList());
+
+        LOG.debug("Proxy {} skipping transactions {}", this, txIds);
+        connection.enqueueRequest(new SkipTransactionsRequest(new TransactionIdentifier(identifier,
+            txIds.get(0).longValue()), 0, localActor(),txIds.subList(1, txIds.size())), resp -> {
+                LOG.debug("Proxy {} confirmed transaction skip", this);
+            }, connection.currentTime());
+    }
+
     final void abortTransaction(final AbstractProxyTransaction tx) {
         lock.lock();
         try {
@@ -448,10 +561,12 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
 
     final void enqueueRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
             final long enqueuedTicks) {
+        skipTransactions();
         connection.enqueueRequest(request, callback, enqueuedTicks);
     }
 
     final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
+        skipTransactions();
         connection.sendRequest(request, callback);
     }
 
index 22536cc..022bb7a 100644 (file)
@@ -12,6 +12,7 @@ import static java.util.Objects.requireNonNull;
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.primitives.UnsignedLong;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
@@ -24,6 +25,8 @@ import org.opendaylight.controller.cluster.access.commands.DeadTransactionExcept
 import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest;
 import org.opendaylight.controller.cluster.access.commands.LocalHistorySuccess;
 import org.opendaylight.controller.cluster.access.commands.OutOfOrderRequestException;
+import org.opendaylight.controller.cluster.access.commands.SkipTransactionsRequest;
+import org.opendaylight.controller.cluster.access.commands.SkipTransactionsResponse;
 import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionPurgeResponse;
 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
@@ -78,6 +81,8 @@ abstract class AbstractFrontendHistory implements Identifiable<LocalHistoryIdent
             final RequestEnvelope envelope, final long now) throws RequestException {
         if (request instanceof TransactionPurgeRequest) {
             return handleTransactionPurgeRequest((TransactionPurgeRequest) request, envelope, now);
+        } else if (request instanceof SkipTransactionsRequest) {
+            return handleSkipTransactionsRequest((SkipTransactionsRequest) request, envelope, now);
         }
 
         final TransactionIdentifier id = request.getTarget();
@@ -164,6 +169,43 @@ abstract class AbstractFrontendHistory implements Identifiable<LocalHistoryIdent
         return null;
     }
 
+    private SkipTransactionsResponse handleSkipTransactionsRequest(final SkipTransactionsRequest request,
+            final RequestEnvelope envelope, final long now) throws RequestException {
+        final var first = request.getTarget();
+        final var others = request.getOthers();
+        final var ids = new ArrayList<UnsignedLong>(others.size() + 1);
+        ids.add(UnsignedLong.fromLongBits(first.getTransactionId()));
+        ids.addAll(others);
+
+        final var it = ids.iterator();
+        while (it.hasNext()) {
+            final var id = it.next();
+            final long bits = id.longValue();
+            if (purgedTransactions.contains(bits)) {
+                LOG.warn("{}: history {} tracks {} as purged", persistenceId(), getIdentifier(), id);
+                it.remove();
+            } else if (transactions.containsKey(new TransactionIdentifier(getIdentifier(), bits))) {
+                LOG.warn("{}: history {} tracks {} as open", persistenceId(), getIdentifier(), id);
+                it.remove();
+            }
+        }
+
+        if (ids.isEmpty()) {
+            LOG.debug("{}: history {} completing empty skip request", persistenceId(), getIdentifier());
+            return new SkipTransactionsResponse(first, now);
+        }
+
+        final var transactionIds = MutableUnsignedLongSet.of(ids.stream().mapToLong(UnsignedLong::longValue).toArray())
+            .immutableCopy();
+        LOG.debug("{}: history {} skipping transactions {}", persistenceId(), getIdentifier(), transactionIds.ranges());
+
+        tree.skipTransactions(getIdentifier(), transactionIds, () -> {
+            purgedTransactions.addAll(transactionIds);
+            envelope.sendSuccess(new TransactionPurgeResponse(first, request.getSequence()), readTime() - now);
+        });
+        return null;
+    }
+
     final void destroy(final long sequence, final RequestEnvelope envelope, final long now) {
         LOG.debug("{}: closing history {}", persistenceId(), getIdentifier());
         tree.closeTransactionChain(getIdentifier(),
index acb585e..7f281ab 100644 (file)
@@ -74,6 +74,11 @@ abstract class FrontendClientMetadataBuilder implements Builder<FrontendClientMe
             // No-op
         }
 
+        @Override
+        void onTransactionsSkipped(final LocalHistoryIdentifier historyId, final ImmutableUnsignedLongSet txIds) {
+            // No-op
+        }
+
         @Override
         LeaderFrontendState toLeaderState(final Shard shard) {
             return new LeaderFrontendState.Disabled(shard.persistenceId(), getIdentifier(), shard.getDataStore());
@@ -192,6 +197,17 @@ abstract class FrontendClientMetadataBuilder implements Builder<FrontendClientMe
             }
         }
 
+        @Override
+        void onTransactionsSkipped(final LocalHistoryIdentifier historyId, final ImmutableUnsignedLongSet txIds) {
+            final FrontendHistoryMetadataBuilder history = getHistory(historyId);
+            if (history != null) {
+                history.onTransactionsSkipped(txIds);
+                LOG.debug("{}: History {} skipped transactions {}", shardName(), historyId, txIds);
+            } else {
+                LOG.warn("{}: Unknown history {} for skipped transactions, ignoring", shardName(), historyId);
+            }
+        }
+
         @Override
         LeaderFrontendState toLeaderState(final Shard shard) {
             // Note: we have to make sure to *copy* all current state and not leak any views, otherwise leader/follower
@@ -225,15 +241,21 @@ abstract class FrontendClientMetadataBuilder implements Builder<FrontendClientMe
         }
 
         private FrontendHistoryMetadataBuilder getHistory(final TransactionIdentifier txId) {
-            LocalHistoryIdentifier historyId = txId.getHistoryId();
+            return getHistory(txId.getHistoryId());
+        }
+
+        private FrontendHistoryMetadataBuilder getHistory(final LocalHistoryIdentifier historyId) {
+            final LocalHistoryIdentifier local;
             if (historyId.getHistoryId() == 0 && historyId.getCookie() != 0) {
                 // We are pre-creating the history for free-standing transactions with a zero cookie, hence our lookup
                 // needs to account for that.
                 LOG.debug("{}: looking up {} instead of {}", shardName(), standaloneId, historyId);
-                historyId = standaloneId;
+                local = standaloneId;
+            } else {
+                local = historyId;
             }
 
-            return currentHistories.get(historyId);
+            return currentHistories.get(local);
         }
 
         private LocalHistoryIdentifier standaloneHistoryId() {
@@ -254,7 +276,7 @@ abstract class FrontendClientMetadataBuilder implements Builder<FrontendClientMe
     static FrontendClientMetadataBuilder of(final String shardName, final FrontendClientMetadata meta) {
         // Completely empty histories imply disabled state, as otherwise we'd have a record of the single history --
         // either purged or active
-        return  meta.getCurrentHistories().isEmpty() && meta.getPurgedHistories().isEmpty()
+        return meta.getCurrentHistories().isEmpty() && meta.getPurgedHistories().isEmpty()
             ? new Disabled(shardName, meta.getIdentifier()) : new Enabled(shardName, meta);
     }
 
@@ -279,6 +301,8 @@ abstract class FrontendClientMetadataBuilder implements Builder<FrontendClientMe
 
     abstract void onTransactionPurged(TransactionIdentifier txId);
 
+    abstract void onTransactionsSkipped(LocalHistoryIdentifier historyId, ImmutableUnsignedLongSet txIds);
+
     /**
      * Transform frontend metadata for a particular client into its {@link LeaderFrontendState} counterpart.
      *
index b00d25d..8a1efbb 100644 (file)
@@ -18,6 +18,7 @@ import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.persisted.FrontendHistoryMetadata;
+import org.opendaylight.controller.cluster.datastore.utils.ImmutableUnsignedLongSet;
 import org.opendaylight.controller.cluster.datastore.utils.MutableUnsignedLongSet;
 import org.opendaylight.controller.cluster.datastore.utils.UnsignedLongBitmap;
 import org.opendaylight.yangtools.concepts.Builder;
@@ -75,6 +76,10 @@ final class FrontendHistoryMetadataBuilder implements Builder<FrontendHistoryMet
         purgedTransactions.add(txidBits);
     }
 
+    void onTransactionsSkipped(final ImmutableUnsignedLongSet txIds) {
+        purgedTransactions.addAll(txIds);
+    }
+
     /**
      * Transform frontend metadata for a particular client history into its {@link LocalFrontendHistory} counterpart.
      *
index c8341e2..e3a1899 100644 (file)
@@ -22,6 +22,7 @@ import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifie
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.persisted.FrontendClientMetadata;
 import org.opendaylight.controller.cluster.datastore.persisted.FrontendShardDataTreeSnapshotMetadata;
+import org.opendaylight.controller.cluster.datastore.utils.ImmutableUnsignedLongSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -120,6 +121,11 @@ final class FrontendMetadata extends ShardDataTreeMetadata<FrontendShardDataTree
         ensureClient(txId.getHistoryId().getClientId()).onTransactionPurged(txId);
     }
 
+    @Override
+    void onTransactionsSkipped(final LocalHistoryIdentifier historyId, final ImmutableUnsignedLongSet txIds) {
+        ensureClient(historyId.getClientId()).onTransactionsSkipped(historyId, txIds);
+    }
+
     /**
      * Transform frontend metadata into an active leader state map.
      *
index 428cf84..4aa7a7b 100644 (file)
@@ -65,7 +65,9 @@ import org.opendaylight.controller.cluster.datastore.persisted.PurgeTransactionP
 import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshotMetadata;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
+import org.opendaylight.controller.cluster.datastore.persisted.SkipTransactionsPayload;
 import org.opendaylight.controller.cluster.datastore.utils.DataTreeModificationOutput;
+import org.opendaylight.controller.cluster.datastore.utils.ImmutableUnsignedLongSet;
 import org.opendaylight.controller.cluster.datastore.utils.PruningDataTreeModification;
 import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
@@ -220,8 +222,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
 
     final void updateSchemaContext(final @NonNull EffectiveModelContext newSchemaContext) {
         dataTree.setEffectiveModelContext(newSchemaContext);
-        this.schemaContext = newSchemaContext;
-        this.dataSchemaContext = DataSchemaContextTree.from(newSchemaContext);
+        schemaContext = newSchemaContext;
+        dataSchemaContext = DataSchemaContextTree.from(newSchemaContext);
     }
 
     final void resetTransactionBatch() {
@@ -385,6 +387,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier());
         } else if (payload instanceof PurgeLocalHistoryPayload) {
             allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier());
+        } else if (payload instanceof SkipTransactionsPayload) {
+            allMetadataSkipTransactions((SkipTransactionsPayload) payload);
         } else {
             LOG.debug("{}: ignoring unhandled payload {}", logContext, payload);
         }
@@ -473,6 +477,11 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
                 payloadReplicationComplete((PurgeLocalHistoryPayload)payload);
             }
             allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier());
+        } else if (payload instanceof SkipTransactionsPayload) {
+            if (identifier != null) {
+                payloadReplicationComplete((SkipTransactionsPayload)payload);
+            }
+            allMetadataSkipTransactions((SkipTransactionsPayload) payload);
         } else {
             LOG.warn("{}: ignoring unhandled identifier {} payload {}", logContext, identifier, payload);
         }
@@ -569,6 +578,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         }
     }
 
+    private void allMetadataSkipTransactions(final SkipTransactionsPayload payload) {
+        final var historyId = payload.getIdentifier();
+        final var txIds = payload.getTransactionIds();
+        for (ShardDataTreeMetadata<?> m : metadata) {
+            m.onTransactionsSkipped(historyId, txIds);
+        }
+    }
+
     /**
      * Create a transaction chain for specified history. Unlike {@link #ensureTransactionChain(LocalHistoryIdentifier)},
      * this method is used for re-establishing state when we are taking over
@@ -695,6 +712,21 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
                 id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
     }
 
+    final void skipTransactions(final LocalHistoryIdentifier id, final ImmutableUnsignedLongSet transactionIds,
+            final Runnable callback) {
+        final ShardDataTreeTransactionChain chain = transactionChains.get(id);
+        if (chain == null) {
+            LOG.debug("{}: Skipping on non-existent transaction chain {}", logContext, id);
+            if (callback != null) {
+                callback.run();
+            }
+            return;
+        }
+
+        replicatePayload(id, SkipTransactionsPayload.create(id, transactionIds,
+            shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
+    }
+
     final Optional<DataTreeCandidate> readCurrentData() {
         return dataTree.takeSnapshot().readNode(YangInstanceIdentifier.empty())
                 .map(state -> DataTreeCandidates.fromNormalizedNode(YangInstanceIdentifier.empty(), state));
index e2ca7ca..76719f9 100644 (file)
@@ -13,6 +13,7 @@ import org.eclipse.jdt.annotation.Nullable;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshotMetadata;
+import org.opendaylight.controller.cluster.datastore.utils.ImmutableUnsignedLongSet;
 
 abstract class ShardDataTreeMetadata<T extends ShardDataTreeSnapshotMetadata<T>> {
     /**
@@ -61,6 +62,8 @@ abstract class ShardDataTreeMetadata<T extends ShardDataTreeSnapshotMetadata<T>>
 
     abstract void onTransactionPurged(TransactionIdentifier txId);
 
+    abstract void onTransactionsSkipped(LocalHistoryIdentifier historyId, ImmutableUnsignedLongSet txIds);
+
     abstract void onHistoryCreated(LocalHistoryIdentifier historyId);
 
     abstract void onHistoryClosed(LocalHistoryIdentifier historyId);
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/SkipTransactionsPayload.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/SkipTransactionsPayload.java
new file mode 100644 (file)
index 0000000..ec6e227
--- /dev/null
@@ -0,0 +1,94 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.persisted;
+
+import static com.google.common.base.Verify.verifyNotNull;
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.io.ByteStreams;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.DataInput;
+import java.io.IOException;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.datastore.utils.ImmutableUnsignedLongSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Payload persisted when a local history is instructed some transaction identifiers, i.e. the frontend has used them
+ * for other purposes. It contains a {@link LocalHistoryIdentifier} and a list of transaction identifiers within that
+ * local history.
+ */
+public final class SkipTransactionsPayload extends AbstractIdentifiablePayload<LocalHistoryIdentifier> {
+    private static final class Proxy extends AbstractProxy<LocalHistoryIdentifier> {
+        private static final long serialVersionUID = 1L;
+
+        private ImmutableUnsignedLongSet transactionIds;
+
+        // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+        // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+        @SuppressWarnings("checkstyle:RedundantModifier")
+        public Proxy() {
+            // For Externalizable
+        }
+
+        Proxy(final byte[] serialized) {
+            super(serialized);
+        }
+
+        @Override
+        protected LocalHistoryIdentifier readIdentifier(final DataInput in) throws IOException {
+            final var id = LocalHistoryIdentifier.readFrom(in);
+            transactionIds = ImmutableUnsignedLongSet.readFrom(in);
+            return id;
+        }
+
+        @Override
+        protected SkipTransactionsPayload createObject(final LocalHistoryIdentifier identifier,
+                final byte[] serialized) {
+            return new SkipTransactionsPayload(identifier, serialized, verifyNotNull(transactionIds));
+        }
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(SkipTransactionsPayload.class);
+    private static final long serialVersionUID = 1L;
+
+    @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "Handled via externalizable proxy")
+    private final @NonNull ImmutableUnsignedLongSet transactionIds;
+
+    private SkipTransactionsPayload(final @NonNull LocalHistoryIdentifier historyId,
+            final byte @NonNull [] serialized, final ImmutableUnsignedLongSet transactionIds) {
+        super(historyId, serialized);
+        this.transactionIds = requireNonNull(transactionIds);
+    }
+
+    public static @NonNull SkipTransactionsPayload create(final LocalHistoryIdentifier historyId,
+            final ImmutableUnsignedLongSet transactionIds, final int initialSerializedBufferCapacity) {
+        final var out = ByteStreams.newDataOutput(initialSerializedBufferCapacity);
+        try {
+            historyId.writeTo(out);
+            transactionIds.writeTo(out);
+        } catch (IOException e) {
+            // This should never happen
+            LOG.error("Failed to serialize {} ids {}", historyId, transactionIds, e);
+            throw new RuntimeException("Failed to serialize " + historyId + " ids " + transactionIds, e);
+        }
+
+        return new SkipTransactionsPayload(historyId, out.toByteArray(), transactionIds);
+    }
+
+    public @NonNull ImmutableUnsignedLongSet getTransactionIds() {
+        return transactionIds;
+    }
+
+    @Override
+    protected Proxy externalizableProxy(final byte[] serialized) {
+        return new Proxy(serialized);
+    }
+}
index a8ba69b..f332bcf 100644 (file)
@@ -7,6 +7,8 @@
  */
 package org.opendaylight.controller.cluster.databroker.actors.dds;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -19,10 +21,9 @@ import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
 import akka.testkit.TestProbe;
 import akka.testkit.javadsl.TestKit;
-import java.util.Collection;
-import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 import org.junit.After;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mock;
@@ -82,8 +83,7 @@ public abstract class AbstractClientHandleTest<T extends AbstractClientHandle<Ab
         client.getConnection(0L);
         contextProbe.expectMsgClass(ConnectClientRequest.class);
         final long sequence = 0L;
-        contextProbe.reply(new ConnectClientSuccess(CLIENT_ID, sequence, backendProbe.ref(),
-                Collections.emptyList(), dataTree, 3));
+        contextProbe.reply(new ConnectClientSuccess(CLIENT_ID, sequence, backendProbe.ref(), List.of(), dataTree, 3));
         final InternalCommand<ShardBackendInfo> command = clientContextProbe.expectMsgClass(InternalCommand.class);
         command.execute(client);
         //data tree mock
@@ -111,7 +111,7 @@ public abstract class AbstractClientHandleTest<T extends AbstractClientHandle<Ab
 
     @Test
     public void testGetIdentifier() {
-        Assert.assertEquals(TRANSACTION_ID, handle.getIdentifier());
+        assertEquals(TRANSACTION_ID, handle.getIdentifier());
     }
 
     @Test
@@ -120,7 +120,7 @@ public abstract class AbstractClientHandleTest<T extends AbstractClientHandle<Ab
         handle.abort();
         final Envelope<?> envelope = backendProbe.expectMsgClass(Envelope.class);
         final AbortLocalTransactionRequest request = (AbortLocalTransactionRequest) envelope.getMessage();
-        Assert.assertEquals(TRANSACTION_ID, request.getTarget());
+        assertEquals(TRANSACTION_ID, request.getTarget());
         checkClosed();
     }
 
@@ -130,28 +130,28 @@ public abstract class AbstractClientHandleTest<T extends AbstractClientHandle<Ab
         handle.localAbort(new RuntimeException("fail"));
         final Envelope<?> envelope = backendProbe.expectMsgClass(Envelope.class);
         final AbortLocalTransactionRequest request = (AbortLocalTransactionRequest) envelope.getMessage();
-        Assert.assertEquals(TRANSACTION_ID, request.getTarget());
+        assertEquals(TRANSACTION_ID, request.getTarget());
         checkClosed();
     }
 
     @Test
     public void testEnsureClosed() {
         doHandleOperation(handle);
-        final Collection<AbstractProxyTransaction> transactions = handle.ensureClosed();
-        Assert.assertNotNull(transactions);
-        Assert.assertEquals(1, transactions.size());
+        final Map<Long, AbstractProxyTransaction> transactions = handle.ensureClosed();
+        assertNotNull(transactions);
+        assertEquals(1, transactions.size());
     }
 
     @Test
     public void testEnsureProxy() {
         final AbstractProxyTransaction expected = mock(AbstractProxyTransaction.class);
         final AbstractProxyTransaction proxy = handle.ensureProxy(PATH);
-        Assert.assertEquals(0, proxy.getIdentifier().getTransactionId());
+        assertEquals(0, proxy.getIdentifier().getTransactionId());
     }
 
     @Test
     public void testParent() {
-        Assert.assertEquals(parent, handle.parent());
+        assertEquals(parent, handle.parent());
     }
 
     protected void checkClosed() throws Exception {
@@ -170,7 +170,7 @@ public abstract class AbstractClientHandleTest<T extends AbstractClientHandle<Ab
     protected <R extends Request<?, R>> R backendRespondToRequest(final Class<R> expectedRequestClass,
                                                             final Response<?, ?> response) {
         final RequestEnvelope envelope = backendProbe.expectMsgClass(RequestEnvelope.class);
-        Assert.assertEquals(expectedRequestClass, envelope.getMessage().getClass());
+        assertEquals(expectedRequestClass, envelope.getMessage().getClass());
         final AbstractClientConnection<ShardBackendInfo> connection = client.getConnection(0L);
         final long sessionId = envelope.getSessionId();
         final long txSequence = envelope.getTxSequence();
index 818e448..a8faac4 100644 (file)
@@ -401,21 +401,18 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         }
     }
 
-    private void assertAskClientMetadata(final FrontendClientMetadata clientMeta) {
+    private static void assertAskClientMetadata(final FrontendClientMetadata clientMeta) {
         // ask based should track no metadata
         assertEquals(List.of(), clientMeta.getCurrentHistories());
     }
 
-    private void assertTellClientMetadata(final FrontendClientMetadata clientMeta, final long lastPurged) {
+    private static void assertTellClientMetadata(final FrontendClientMetadata clientMeta, final long lastPurged) {
         final var iterator = clientMeta.getCurrentHistories().iterator();
         var metadata = iterator.next();
         while (iterator.hasNext() && metadata.getHistoryId() != 1) {
             metadata = iterator.next();
         }
 
-        // FIXME: CONTROLLER-1991: remove this assumption
-        assumeTrue(false);
-
         assertEquals(UnsignedLongBitmap.of(), metadata.getClosedTransactions());
         assertEquals("[[0.." + lastPurged + "]]", metadata.getPurgedTransactions().ranges().toString());
     }
@@ -441,21 +438,17 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
         int numCars = 5;
         for (int i = 0; i < numCars; i++) {
-            writeTx = txChain.newWriteOnlyTransaction();
-            writeTx.close();
+            try (var tx = txChain.newWriteOnlyTransaction()) {
+                // Empty on purpose
+            }
 
             try (var tx = txChain.newReadOnlyTransaction()) {
                 tx.read(CarsModel.BASE_PATH).get();
             }
         }
 
-        writeTx = txChain.newWriteOnlyTransaction();
-        writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
-        writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
-        followerTestKit.doCommit(writeTx.ready());
-
         // wait to let the shard catch up with purged
-        await("Close transaction purge leak test.").atMost(5, TimeUnit.SECONDS)
+        await("wait for purges to settle").atMost(5, TimeUnit.SECONDS)
                 .pollInterval(500, TimeUnit.MILLISECONDS)
                 .untilAsserted(() -> {
                     final var localShard = leaderDistributedDataStore.getActorUtils().findLocalShard("cars")
@@ -466,17 +459,11 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
                     final var clientMeta = frontendMetadata.getClients().get(0);
                     if (leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) {
-                        assertTellClientMetadata(clientMeta, numCars * 2 + 1);
+                        assertTellClientMetadata(clientMeta, numCars * 2);
                     } else {
                         assertAskClientMetadata(clientMeta);
                     }
                 });
-
-        try (var tx = txChain.newReadOnlyTransaction()) {
-            final var body = tx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS).orElseThrow().body();
-            assertThat(body, instanceOf(Collection.class));
-            assertEquals(numCars, ((Collection<?>) body).size());
-        }
     }
 
     @Test
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/SkipTransactionsPayloadTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/SkipTransactionsPayloadTest.java
new file mode 100644 (file)
index 0000000..44012d8
--- /dev/null
@@ -0,0 +1,17 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.persisted;
+
+import org.opendaylight.controller.cluster.datastore.utils.MutableUnsignedLongSet;
+
+public class SkipTransactionsPayloadTest extends AbstractIdentifiablePayloadTest<SkipTransactionsPayload> {
+    @Override
+    SkipTransactionsPayload object() {
+        return SkipTransactionsPayload.create(nextHistoryId(), MutableUnsignedLongSet.of(42).immutableCopy(), 512);
+    }
+}