From c7078128d6f35eebee2f98108ff929dcccfc322d Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Tue, 12 Nov 2019 23:50:34 +0100 Subject: [PATCH] Track skipped transactions We are allocating transaction identifiers which are shared across shards. As a transaction can touch only some shards, non-participating shards will never see the transaction ID, leading to holes in their UnsignedLongSets -- and those holes gradually eat up more and more memory. Track when we have such a hole and lazily forward a new request, which purges one or more such identifiers -- plugging the memory leak. The crux of the machinery is exchanging state between ProxyHistory and FrontendHistoryMetadataBuilder. JIRA: CONTROLLER-1991 Change-Id: I3817fa2841e5f9c405bb20ff1a104537ad459ce3 Signed-off-by: Robert Varga --- .../commands/SkipTransactionsRequest.java | 71 +++++++++++ .../commands/SkipTransactionsRequestV1.java | 84 +++++++++++++ .../commands/SkipTransactionsResponse.java | 35 ++++++ .../SkipTransactionsResponseProxyV1.java | 34 ++++++ .../commands/SkipTransactionsRequestTest.java | 39 ++++++ .../SkipTransactionsResponseTest.java | 36 ++++++ .../client/AbstractClientConnection.java | 2 +- .../actors/dds/AbstractClientHandle.java | 13 +- .../actors/dds/AbstractClientHistory.java | 23 ++++ .../actors/dds/ClientTransaction.java | 19 ++- .../databroker/actors/dds/ProxyHistory.java | 115 ++++++++++++++++++ .../datastore/AbstractFrontendHistory.java | 42 +++++++ .../FrontendClientMetadataBuilder.java | 32 ++++- .../FrontendHistoryMetadataBuilder.java | 5 + .../cluster/datastore/FrontendMetadata.java | 6 + .../cluster/datastore/ShardDataTree.java | 36 +++++- .../datastore/ShardDataTreeMetadata.java | 3 + .../persisted/SkipTransactionsPayload.java | 94 ++++++++++++++ .../actors/dds/AbstractClientHandleTest.java | 28 ++--- ...butedDataStoreRemotingIntegrationTest.java | 27 ++-- .../SkipTransactionsPayloadTest.java | 17 +++ 21 files changed, 708 insertions(+), 53 deletions(-) create mode 100644 opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/SkipTransactionsRequest.java create mode 100644 opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/SkipTransactionsRequestV1.java create mode 100644 opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/SkipTransactionsResponse.java create mode 100644 opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/SkipTransactionsResponseProxyV1.java create mode 100644 opendaylight/md-sal/cds-access-api/src/test/java/org/opendaylight/controller/cluster/access/commands/SkipTransactionsRequestTest.java create mode 100644 opendaylight/md-sal/cds-access-api/src/test/java/org/opendaylight/controller/cluster/access/commands/SkipTransactionsResponseTest.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/SkipTransactionsPayload.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/SkipTransactionsPayloadTest.java diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/SkipTransactionsRequest.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/SkipTransactionsRequest.java new file mode 100644 index 0000000000..dd5faa8e87 --- /dev/null +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/SkipTransactionsRequest.java @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.commands; + +import akka.actor.ActorRef; +import com.google.common.annotations.Beta; +import com.google.common.base.MoreObjects.ToStringHelper; +import com.google.common.collect.ImmutableList; +import com.google.common.primitives.UnsignedLong; +import java.util.Collection; +import java.util.List; +import org.eclipse.jdt.annotation.NonNull; +import org.opendaylight.controller.cluster.access.ABIVersion; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; + +/** + * Request to skip a number of {@link TransactionIdentifier}s within a {code local history}. This request is essentially + * equivalent to {@link TransactionPurgeRequest} for {@link #getTarget()}, but also carries additional sibling + * {@link TransactionIdentifier}s in {@link #getOthers()}. + * + *

+ * This request is sent by the frontend to inform the backend that a set of {@link TransactionIdentifier}s are + * explicitly retired and are guaranteed to never be used by the frontend. + */ +@Beta +public final class SkipTransactionsRequest extends TransactionRequest { + private static final long serialVersionUID = 1L; + + // Note: UnsignedLong is arbitrary, yang.common.Uint64 would work just as well, we really want an immutable + // List, though. + private final @NonNull ImmutableList others; + + public SkipTransactionsRequest(final TransactionIdentifier target, final long sequence, + final ActorRef replyTo, final Collection others) { + super(target, sequence, replyTo); + this.others = ImmutableList.copyOf(others); + } + + /** + * Return this {@link #getTarget()}s sibling {@link TransactionIdentifier}s. + * + * @return Siblings values of {@link TransactionIdentifier#getTransactionId()} + */ + public List getOthers() { + return others; + } + + @Override + protected SkipTransactionsRequestV1 externalizableProxy(final ABIVersion version) { + return new SkipTransactionsRequestV1(this); + } + + @Override + protected SkipTransactionsRequest cloneAsVersion(final ABIVersion version) { + return this; + } + + @Override + protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) { + final var helper = super.addToStringAttributes(toStringHelper); + if (!others.isEmpty()) { + helper.add("others", others); + } + return helper; + } +} diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/SkipTransactionsRequestV1.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/SkipTransactionsRequestV1.java new file mode 100644 index 0000000000..c7c383cf8d --- /dev/null +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/SkipTransactionsRequestV1.java @@ -0,0 +1,84 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.commands; + +import akka.actor.ActorRef; +import com.google.common.collect.ImmutableList; +import com.google.common.primitives.UnsignedLong; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.List; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.yangtools.concepts.WritableObjects; + +/** + * Externalizable proxy for use with {@link SkipTransactionsRequest}. It implements the initial + * (Phosphorus SR1) serialization format. + */ +final class SkipTransactionsRequestV1 extends AbstractTransactionRequestProxy { + private List others; + + // checkstyle flags the public modifier as redundant however it is explicitly needed for Java serialization to + // be able to create instances via reflection. + @SuppressWarnings("checkstyle:RedundantModifier") + public SkipTransactionsRequestV1() { + // For Externalizable + } + + SkipTransactionsRequestV1(final SkipTransactionsRequest request) { + super(request); + others = request.getOthers(); + } + + @Override + public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException { + super.readExternal(in); + + final int size = in.readInt(); + final var builder = ImmutableList.builderWithExpectedSize(size); + int idx; + if (size % 2 != 0) { + builder.add(UnsignedLong.fromLongBits(WritableObjects.readLong(in))); + idx = 1; + } else { + idx = 0; + } + for (; idx < size; idx += 2) { + final byte hdr = WritableObjects.readLongHeader(in); + builder.add(UnsignedLong.fromLongBits(WritableObjects.readFirstLong(in, hdr))); + builder.add(UnsignedLong.fromLongBits(WritableObjects.readSecondLong(in, hdr))); + } + others = builder.build(); + } + + @Override + public void writeExternal(final ObjectOutput out) throws IOException { + super.writeExternal(out); + + final int size = others.size(); + out.writeInt(size); + + int idx; + if (size % 2 != 0) { + WritableObjects.writeLong(out, others.get(0).longValue()); + idx = 1; + } else { + idx = 0; + } + for (; idx < size; idx += 2) { + WritableObjects.writeLongs(out, others.get(idx).longValue(), others.get(idx + 1).longValue()); + } + } + + @Override + protected SkipTransactionsRequest createRequest(final TransactionIdentifier target, final long sequence, + final ActorRef replyToActor) { + return new SkipTransactionsRequest(target, sequence, replyToActor, others); + } +} diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/SkipTransactionsResponse.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/SkipTransactionsResponse.java new file mode 100644 index 0000000000..b62af7e7dc --- /dev/null +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/SkipTransactionsResponse.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.commands; + +import com.google.common.annotations.Beta; +import org.opendaylight.controller.cluster.access.ABIVersion; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; + +/** + * Successful reply to a {@link SkipTransactionsRequest}. + */ +@Beta +public final class SkipTransactionsResponse extends TransactionSuccess { + private static final long serialVersionUID = 1L; + + public SkipTransactionsResponse(final TransactionIdentifier identifier, final long sequence) { + super(identifier, sequence); + } + + @Override + protected AbstractTransactionSuccessProxy externalizableProxy( + final ABIVersion version) { + return new SkipTransactionsResponseProxyV1(this); + } + + @Override + protected SkipTransactionsResponse cloneAsVersion(final ABIVersion version) { + return this; + } +} diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/SkipTransactionsResponseProxyV1.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/SkipTransactionsResponseProxyV1.java new file mode 100644 index 0000000000..9bc93f9497 --- /dev/null +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/SkipTransactionsResponseProxyV1.java @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.commands; + +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; + +/** + * Externalizable proxy for use with {@link SkipTransactionsResponse}. It implements the initial (Phosphorus SR1) + * serialization format. + */ +final class SkipTransactionsResponseProxyV1 extends AbstractTransactionSuccessProxy { + private static final long serialVersionUID = 1L; + + // checkstyle flags the public modifier as redundant however it is explicitly needed for Java serialization to + // be able to create instances via reflection. + @SuppressWarnings("checkstyle:RedundantModifier") + public SkipTransactionsResponseProxyV1() { + // For Externalizable + } + + SkipTransactionsResponseProxyV1(final SkipTransactionsResponse success) { + super(success); + } + + @Override + protected SkipTransactionsResponse createSuccess(final TransactionIdentifier target, final long sequence) { + return new SkipTransactionsResponse(target, sequence); + } +} diff --git a/opendaylight/md-sal/cds-access-api/src/test/java/org/opendaylight/controller/cluster/access/commands/SkipTransactionsRequestTest.java b/opendaylight/md-sal/cds-access-api/src/test/java/org/opendaylight/controller/cluster/access/commands/SkipTransactionsRequestTest.java new file mode 100644 index 0000000000..39076e3007 --- /dev/null +++ b/opendaylight/md-sal/cds-access-api/src/test/java/org/opendaylight/controller/cluster/access/commands/SkipTransactionsRequestTest.java @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.commands; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertEquals; + +import com.google.common.primitives.UnsignedLong; +import java.util.List; +import org.junit.Test; +import org.opendaylight.controller.cluster.access.ABIVersion; + +public class SkipTransactionsRequestTest extends AbstractTransactionRequestTest { + private static final SkipTransactionsRequest OBJECT = new SkipTransactionsRequest( + TRANSACTION_IDENTIFIER, 0, ACTOR_REF, List.of(UnsignedLong.ONE)); + + @Override + protected SkipTransactionsRequest object() { + return OBJECT; + } + + @Test + public void cloneAsVersionTest() { + final SkipTransactionsRequest clone = OBJECT.cloneAsVersion(ABIVersion.BORON); + assertEquals(OBJECT, clone); + } + + @Override + protected void doAdditionalAssertions(final Object deserialize) { + assertThat(deserialize, instanceOf(SkipTransactionsRequest.class)); + assertEquals(OBJECT.getReplyTo(), ((SkipTransactionsRequest) deserialize).getReplyTo()); + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/cds-access-api/src/test/java/org/opendaylight/controller/cluster/access/commands/SkipTransactionsResponseTest.java b/opendaylight/md-sal/cds-access-api/src/test/java/org/opendaylight/controller/cluster/access/commands/SkipTransactionsResponseTest.java new file mode 100644 index 0000000000..be70ad96ea --- /dev/null +++ b/opendaylight/md-sal/cds-access-api/src/test/java/org/opendaylight/controller/cluster/access/commands/SkipTransactionsResponseTest.java @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.commands; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertEquals; + +import org.junit.Test; +import org.opendaylight.controller.cluster.access.ABIVersion; + +public class SkipTransactionsResponseTest extends AbstractTransactionSuccessTest { + private static final SkipTransactionsResponse OBJECT = new SkipTransactionsResponse( + TRANSACTION_IDENTIFIER, 0); + + @Override + protected SkipTransactionsResponse object() { + return OBJECT; + } + + @Test + public void cloneAsVersionTest() { + final SkipTransactionsResponse clone = OBJECT.cloneAsVersion(ABIVersion.BORON); + assertEquals(OBJECT, clone); + } + + @Override + protected void doAdditionalAssertions(final Object deserialize) { + assertThat(deserialize, instanceOf(SkipTransactionsResponse.class)); + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java index 9c290df13d..149f38f9b6 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java @@ -165,7 +165,7 @@ public abstract class AbstractClientConnection { * *

* Note that unlike {@link #sendRequest(Request, Consumer)}, this method does not exert backpressure, hence it - * should never be called from an application thread. + * should never be called from an application thread and serves mostly for moving requests between queues. * * @param request Request to send * @param callback Callback to invoke diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHandle.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHandle.java index aa4779c898..003c073de8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHandle.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHandle.java @@ -12,7 +12,7 @@ import static java.util.Objects.requireNonNull; import com.google.common.annotations.Beta; import com.google.common.base.MoreObjects; -import java.util.Collection; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import org.eclipse.jdt.annotation.NonNull; @@ -76,12 +76,13 @@ public abstract class AbstractClientHandle e } private boolean commonAbort() { - final Collection toClose = ensureClosed(); + final Map toClose = ensureClosed(); if (toClose == null) { return false; } - toClose.forEach(AbstractProxyTransaction::abort); + toClose.values().forEach(AbstractProxyTransaction::abort); + parent.onTransactionShardsBound(transactionId, toClose.keySet()); return true; } @@ -95,14 +96,14 @@ public abstract class AbstractClientHandle e * Make sure this snapshot is closed. If it became closed as the effect of this call, return a collection of * {@link AbstractProxyTransaction} handles which need to be closed, too. * - * @return null if this snapshot has already been closed, otherwise a collection of proxies, which need to be + * @return null if this snapshot has already been closed, otherwise a State with of proxies, which need to be * closed, too. */ - final @Nullable Collection ensureClosed() { + final @Nullable Map ensureClosed() { // volatile read and a conditional CAS. This ends up being better in the typical case when we are invoked more // than once (see ClientBackedTransaction) than performing a STATE_UPDATER.getAndSet(). final State local = state; - return local != null && STATE_UPDATER.compareAndSet(this, local, null) ? local.values() : null; + return local != null && STATE_UPDATER.compareAndSet(this, local, null) ? local : null; } final T ensureProxy(final YangInstanceIdentifier path) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java index d306d13e2e..95552b382e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java @@ -15,6 +15,7 @@ import static java.util.Objects.requireNonNull; import java.util.Collection; import java.util.HashMap; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; @@ -239,6 +240,28 @@ public abstract class AbstractClientHistory extends LocalAbortable implements Id @Holding("this") abstract ClientTransaction doCreateTransaction(); + /** + * Callback invoked from {@link AbstractClientHandle}'s lifecycle to inform that a particular transaction is + * completing with a set of participating shards. + * + * @param txId Transaction identifier + * @param participatingShards Participating shard cookies + */ + final void onTransactionShardsBound(final TransactionIdentifier txId, final Set participatingShards) { + // Guard against startReconnect() kicking in. It is okay to connect new participants concurrently, as those + // will not see the holes caused by this. + final long stamp = lock.readLock(); + try { + for (var entry : histories.entrySet()) { + if (!participatingShards.contains(entry.getKey())) { + entry.getValue().skipTransaction(txId); + } + } + } finally { + lock.unlockRead(stamp); + } + } + /** * Callback invoked from {@link ClientTransaction} when a child transaction readied for submission. * diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransaction.java index f5855c279a..7cdc04aba1 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransaction.java @@ -12,6 +12,7 @@ import static com.google.common.base.Preconditions.checkState; import com.google.common.annotations.Beta; import com.google.common.util.concurrent.FluentFuture; import java.util.Collection; +import java.util.Map; import java.util.Optional; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort; @@ -79,24 +80,30 @@ public class ClientTransaction extends AbstractClientHandle toReady = ensureClosed(); - checkState(toReady != null, "Attempted to submit a closed transaction %s", this); + final Map participants = ensureClosed(); + checkState(participants != null, "Attempted to submit a closed transaction %s", this); + final Collection toReady = participants.values(); toReady.forEach(AbstractProxyTransaction::seal); + + final TransactionIdentifier txId = getIdentifier(); + final AbstractClientHistory parent = parent(); + parent.onTransactionShardsBound(txId, participants.keySet()); + final AbstractTransactionCommitCohort cohort; switch (toReady.size()) { case 0: - cohort = new EmptyTransactionCommitCohort(parent(), getIdentifier()); + cohort = new EmptyTransactionCommitCohort(parent, txId); break; case 1: - cohort = new DirectTransactionCommitCohort(parent(), getIdentifier(), toReady.iterator().next()); + cohort = new DirectTransactionCommitCohort(parent, txId, toReady.iterator().next()); break; default: - cohort = new ClientTransactionCommitCohort(parent(), getIdentifier(), toReady); + cohort = new ClientTransactionCommitCohort(parent, txId, toReady); break; } - return parent().onTransactionReady(this, cohort); + return parent.onTransactionReady(this, cohort); } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java index fb8764041e..70b5960a05 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java @@ -13,10 +13,14 @@ import static com.google.common.base.Verify.verifyNotNull; import static java.util.Objects.requireNonNull; import akka.actor.ActorRef; +import com.google.common.collect.ImmutableList; +import com.google.common.primitives.UnsignedLong; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; @@ -34,6 +38,7 @@ import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryReq import org.opendaylight.controller.cluster.access.commands.DestroyLocalHistoryRequest; import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest; import org.opendaylight.controller.cluster.access.commands.PurgeLocalHistoryRequest; +import org.opendaylight.controller.cluster.access.commands.SkipTransactionsRequest; import org.opendaylight.controller.cluster.access.commands.TransactionRequest; import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; import org.opendaylight.controller.cluster.access.concepts.Request; @@ -244,6 +249,14 @@ abstract class ProxyHistory implements Identifiable { t.replayMessages(successor, previousEntries); } + // Forward any skipped transactions + final var local = skippedTransactions; + if (local != null) { + LOG.debug("{} forwarding skipped transactions towards successor {}", identifier, successor); + successor.skipTransactions(local); + skippedTransactions = null; + } + // Now look for any finalizing messages it = previousEntries.iterator(); while (it.hasNext()) { @@ -330,6 +343,26 @@ abstract class ProxyHistory implements Identifiable { @GuardedBy("lock") private ProxyHistory successor; + // List of transaction identifiers which were allocated by our parent history, but did not touch our shard. Each of + // these represents a hole in otherwise-contiguous allocation of transactionIds. These holes are problematic, as + // each of them prevents LeaderFrontendState.purgedHistories from coalescing, leading to a gradual heap exhaustion. + // + //

+ // We keep these in an ArrayList for fast insertion, as that happens when we are otherwise idle. We translate these + // into purge requests when: + // - we are about to allocate a new transaction + // - we get a successor proxy + // - the list grows unreasonably long + // + // TODO: we are tracking entire TransactionIdentifiers, but really only need to track the longs. Do that once we + // have a {@code List}. + // FIXME: this is not tuneable, but perhaps should be + // FIXME: default value deserves some explanation -- this affects depth of an RB Tree on the receiving end. + private static final int PURGE_SKIPPED_TXID_THRESHOLD = 256; + + @GuardedBy("lock") + private volatile List skippedTransactions; + private ProxyHistory(final AbstractClientHistory parent, final AbstractClientConnection connection, final LocalHistoryIdentifier identifier) { this.parent = requireNonNull(parent); @@ -398,6 +431,86 @@ abstract class ProxyHistory implements Identifiable { } } + final void skipTransaction(final TransactionIdentifier txId) { + lock.lock(); + try { + if (successor != null) { + successor.skipTransaction(txId); + return; + } + + var local = skippedTransactions; + if (local == null) { + skippedTransactions = local = new ArrayList<>(); + } + local.add(txId); + LOG.debug("Recorded skipped transaction {}", txId); + skipIfNeeded(local); + } finally { + lock.unlock(); + } + } + + @Holding("lock") + private void skipIfNeeded(final List current) { + if (current.size() >= PURGE_SKIPPED_TXID_THRESHOLD) { + skippedTransactions = null; + doSkipTransactions(current); + } + } + + private void skipTransactions(final List toSkip) { + lock.lock(); + try { + if (successor != null) { + successor.skipTransactions(toSkip); + return; + } + + var local = skippedTransactions; + if (local != null) { + local.addAll(toSkip); + } else { + skippedTransactions = local = toSkip; + } + skipIfNeeded(local); + } finally { + lock.unlock(); + } + } + + private void skipTransactions() { + var local = skippedTransactions; + if (local != null) { + lock.lock(); + try { + local = skippedTransactions; + if (local != null && successor == null) { + skippedTransactions = null; + doSkipTransactions(local); + } + } finally { + lock.unlock(); + } + } + } + + @Holding("lock") + private void doSkipTransactions(final List toSkip) { + final var txIds = toSkip.stream() + .mapToLong(TransactionIdentifier::getTransactionId) + .distinct() + .sorted() + .mapToObj(UnsignedLong::fromLongBits) + .collect(ImmutableList.toImmutableList()); + + LOG.debug("Proxy {} skipping transactions {}", this, txIds); + connection.enqueueRequest(new SkipTransactionsRequest(new TransactionIdentifier(identifier, + txIds.get(0).longValue()), 0, localActor(),txIds.subList(1, txIds.size())), resp -> { + LOG.debug("Proxy {} confirmed transaction skip", this); + }, connection.currentTime()); + } + final void abortTransaction(final AbstractProxyTransaction tx) { lock.lock(); try { @@ -448,10 +561,12 @@ abstract class ProxyHistory implements Identifiable { final void enqueueRequest(final TransactionRequest request, final Consumer> callback, final long enqueuedTicks) { + skipTransactions(); connection.enqueueRequest(request, callback, enqueuedTicks); } final void sendRequest(final TransactionRequest request, final Consumer> callback) { + skipTransactions(); connection.sendRequest(request, callback); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java index 22536cc50a..022bb7aa07 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java @@ -12,6 +12,7 @@ import static java.util.Objects.requireNonNull; import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableMap; import com.google.common.primitives.UnsignedLong; +import java.util.ArrayList; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -24,6 +25,8 @@ import org.opendaylight.controller.cluster.access.commands.DeadTransactionExcept import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest; import org.opendaylight.controller.cluster.access.commands.LocalHistorySuccess; import org.opendaylight.controller.cluster.access.commands.OutOfOrderRequestException; +import org.opendaylight.controller.cluster.access.commands.SkipTransactionsRequest; +import org.opendaylight.controller.cluster.access.commands.SkipTransactionsResponse; import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest; import org.opendaylight.controller.cluster.access.commands.TransactionPurgeResponse; import org.opendaylight.controller.cluster.access.commands.TransactionRequest; @@ -78,6 +81,8 @@ abstract class AbstractFrontendHistory implements Identifiable(others.size() + 1); + ids.add(UnsignedLong.fromLongBits(first.getTransactionId())); + ids.addAll(others); + + final var it = ids.iterator(); + while (it.hasNext()) { + final var id = it.next(); + final long bits = id.longValue(); + if (purgedTransactions.contains(bits)) { + LOG.warn("{}: history {} tracks {} as purged", persistenceId(), getIdentifier(), id); + it.remove(); + } else if (transactions.containsKey(new TransactionIdentifier(getIdentifier(), bits))) { + LOG.warn("{}: history {} tracks {} as open", persistenceId(), getIdentifier(), id); + it.remove(); + } + } + + if (ids.isEmpty()) { + LOG.debug("{}: history {} completing empty skip request", persistenceId(), getIdentifier()); + return new SkipTransactionsResponse(first, now); + } + + final var transactionIds = MutableUnsignedLongSet.of(ids.stream().mapToLong(UnsignedLong::longValue).toArray()) + .immutableCopy(); + LOG.debug("{}: history {} skipping transactions {}", persistenceId(), getIdentifier(), transactionIds.ranges()); + + tree.skipTransactions(getIdentifier(), transactionIds, () -> { + purgedTransactions.addAll(transactionIds); + envelope.sendSuccess(new TransactionPurgeResponse(first, request.getSequence()), readTime() - now); + }); + return null; + } + final void destroy(final long sequence, final RequestEnvelope envelope, final long now) { LOG.debug("{}: closing history {}", persistenceId(), getIdentifier()); tree.closeTransactionChain(getIdentifier(), diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendClientMetadataBuilder.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendClientMetadataBuilder.java index acb585e080..7f281ab0f3 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendClientMetadataBuilder.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendClientMetadataBuilder.java @@ -74,6 +74,11 @@ abstract class FrontendClientMetadataBuilder implements Builder m : metadata) { + m.onTransactionsSkipped(historyId, txIds); + } + } + /** * Create a transaction chain for specified history. Unlike {@link #ensureTransactionChain(LocalHistoryIdentifier)}, * this method is used for re-establishing state when we are taking over @@ -695,6 +712,21 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback); } + final void skipTransactions(final LocalHistoryIdentifier id, final ImmutableUnsignedLongSet transactionIds, + final Runnable callback) { + final ShardDataTreeTransactionChain chain = transactionChains.get(id); + if (chain == null) { + LOG.debug("{}: Skipping on non-existent transaction chain {}", logContext, id); + if (callback != null) { + callback.run(); + } + return; + } + + replicatePayload(id, SkipTransactionsPayload.create(id, transactionIds, + shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback); + } + final Optional readCurrentData() { return dataTree.takeSnapshot().readNode(YangInstanceIdentifier.empty()) .map(state -> DataTreeCandidates.fromNormalizedNode(YangInstanceIdentifier.empty(), state)); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeMetadata.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeMetadata.java index e2ca7cab2c..76719f94f8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeMetadata.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeMetadata.java @@ -13,6 +13,7 @@ import org.eclipse.jdt.annotation.Nullable; import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshotMetadata; +import org.opendaylight.controller.cluster.datastore.utils.ImmutableUnsignedLongSet; abstract class ShardDataTreeMetadata> { /** @@ -61,6 +62,8 @@ abstract class ShardDataTreeMetadata> abstract void onTransactionPurged(TransactionIdentifier txId); + abstract void onTransactionsSkipped(LocalHistoryIdentifier historyId, ImmutableUnsignedLongSet txIds); + abstract void onHistoryCreated(LocalHistoryIdentifier historyId); abstract void onHistoryClosed(LocalHistoryIdentifier historyId); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/SkipTransactionsPayload.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/SkipTransactionsPayload.java new file mode 100644 index 0000000000..ec6e227a75 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/SkipTransactionsPayload.java @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.persisted; + +import static com.google.common.base.Verify.verifyNotNull; +import static java.util.Objects.requireNonNull; + +import com.google.common.io.ByteStreams; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.io.DataInput; +import java.io.IOException; +import org.eclipse.jdt.annotation.NonNull; +import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; +import org.opendaylight.controller.cluster.datastore.utils.ImmutableUnsignedLongSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Payload persisted when a local history is instructed some transaction identifiers, i.e. the frontend has used them + * for other purposes. It contains a {@link LocalHistoryIdentifier} and a list of transaction identifiers within that + * local history. + */ +public final class SkipTransactionsPayload extends AbstractIdentifiablePayload { + private static final class Proxy extends AbstractProxy { + private static final long serialVersionUID = 1L; + + private ImmutableUnsignedLongSet transactionIds; + + // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't + // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection. + @SuppressWarnings("checkstyle:RedundantModifier") + public Proxy() { + // For Externalizable + } + + Proxy(final byte[] serialized) { + super(serialized); + } + + @Override + protected LocalHistoryIdentifier readIdentifier(final DataInput in) throws IOException { + final var id = LocalHistoryIdentifier.readFrom(in); + transactionIds = ImmutableUnsignedLongSet.readFrom(in); + return id; + } + + @Override + protected SkipTransactionsPayload createObject(final LocalHistoryIdentifier identifier, + final byte[] serialized) { + return new SkipTransactionsPayload(identifier, serialized, verifyNotNull(transactionIds)); + } + } + + private static final Logger LOG = LoggerFactory.getLogger(SkipTransactionsPayload.class); + private static final long serialVersionUID = 1L; + + @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "Handled via externalizable proxy") + private final @NonNull ImmutableUnsignedLongSet transactionIds; + + private SkipTransactionsPayload(final @NonNull LocalHistoryIdentifier historyId, + final byte @NonNull [] serialized, final ImmutableUnsignedLongSet transactionIds) { + super(historyId, serialized); + this.transactionIds = requireNonNull(transactionIds); + } + + public static @NonNull SkipTransactionsPayload create(final LocalHistoryIdentifier historyId, + final ImmutableUnsignedLongSet transactionIds, final int initialSerializedBufferCapacity) { + final var out = ByteStreams.newDataOutput(initialSerializedBufferCapacity); + try { + historyId.writeTo(out); + transactionIds.writeTo(out); + } catch (IOException e) { + // This should never happen + LOG.error("Failed to serialize {} ids {}", historyId, transactionIds, e); + throw new RuntimeException("Failed to serialize " + historyId + " ids " + transactionIds, e); + } + + return new SkipTransactionsPayload(historyId, out.toByteArray(), transactionIds); + } + + public @NonNull ImmutableUnsignedLongSet getTransactionIds() { + return transactionIds; + } + + @Override + protected Proxy externalizableProxy(final byte[] serialized) { + return new Proxy(serialized); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHandleTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHandleTest.java index a8ba69beca..f332bcfabf 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHandleTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHandleTest.java @@ -7,6 +7,8 @@ */ package org.opendaylight.controller.cluster.databroker.actors.dds; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -19,10 +21,9 @@ import akka.actor.ActorSelection; import akka.actor.ActorSystem; import akka.testkit.TestProbe; import akka.testkit.javadsl.TestKit; -import java.util.Collection; -import java.util.Collections; +import java.util.List; +import java.util.Map; import org.junit.After; -import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.Mock; @@ -82,8 +83,7 @@ public abstract class AbstractClientHandleTest command = clientContextProbe.expectMsgClass(InternalCommand.class); command.execute(client); //data tree mock @@ -111,7 +111,7 @@ public abstract class AbstractClientHandleTest envelope = backendProbe.expectMsgClass(Envelope.class); final AbortLocalTransactionRequest request = (AbortLocalTransactionRequest) envelope.getMessage(); - Assert.assertEquals(TRANSACTION_ID, request.getTarget()); + assertEquals(TRANSACTION_ID, request.getTarget()); checkClosed(); } @@ -130,28 +130,28 @@ public abstract class AbstractClientHandleTest envelope = backendProbe.expectMsgClass(Envelope.class); final AbortLocalTransactionRequest request = (AbortLocalTransactionRequest) envelope.getMessage(); - Assert.assertEquals(TRANSACTION_ID, request.getTarget()); + assertEquals(TRANSACTION_ID, request.getTarget()); checkClosed(); } @Test public void testEnsureClosed() { doHandleOperation(handle); - final Collection transactions = handle.ensureClosed(); - Assert.assertNotNull(transactions); - Assert.assertEquals(1, transactions.size()); + final Map transactions = handle.ensureClosed(); + assertNotNull(transactions); + assertEquals(1, transactions.size()); } @Test public void testEnsureProxy() { final AbstractProxyTransaction expected = mock(AbstractProxyTransaction.class); final AbstractProxyTransaction proxy = handle.ensureProxy(PATH); - Assert.assertEquals(0, proxy.getIdentifier().getTransactionId()); + assertEquals(0, proxy.getIdentifier().getTransactionId()); } @Test public void testParent() { - Assert.assertEquals(parent, handle.parent()); + assertEquals(parent, handle.parent()); } protected void checkClosed() throws Exception { @@ -170,7 +170,7 @@ public abstract class AbstractClientHandleTest> R backendRespondToRequest(final Class expectedRequestClass, final Response response) { final RequestEnvelope envelope = backendProbe.expectMsgClass(RequestEnvelope.class); - Assert.assertEquals(expectedRequestClass, envelope.getMessage().getClass()); + assertEquals(expectedRequestClass, envelope.getMessage().getClass()); final AbstractClientConnection connection = client.getConnection(0L); final long sessionId = envelope.getSessionId(); final long txSequence = envelope.getTxSequence(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java index 818e4484e1..a8faac46cc 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java @@ -401,21 +401,18 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { } } - private void assertAskClientMetadata(final FrontendClientMetadata clientMeta) { + private static void assertAskClientMetadata(final FrontendClientMetadata clientMeta) { // ask based should track no metadata assertEquals(List.of(), clientMeta.getCurrentHistories()); } - private void assertTellClientMetadata(final FrontendClientMetadata clientMeta, final long lastPurged) { + private static void assertTellClientMetadata(final FrontendClientMetadata clientMeta, final long lastPurged) { final var iterator = clientMeta.getCurrentHistories().iterator(); var metadata = iterator.next(); while (iterator.hasNext() && metadata.getHistoryId() != 1) { metadata = iterator.next(); } - // FIXME: CONTROLLER-1991: remove this assumption - assumeTrue(false); - assertEquals(UnsignedLongBitmap.of(), metadata.getClosedTransactions()); assertEquals("[[0.." + lastPurged + "]]", metadata.getPurgedTransactions().ranges().toString()); } @@ -441,21 +438,17 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { int numCars = 5; for (int i = 0; i < numCars; i++) { - writeTx = txChain.newWriteOnlyTransaction(); - writeTx.close(); + try (var tx = txChain.newWriteOnlyTransaction()) { + // Empty on purpose + } try (var tx = txChain.newReadOnlyTransaction()) { tx.read(CarsModel.BASE_PATH).get(); } } - writeTx = txChain.newWriteOnlyTransaction(); - writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); - writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); - followerTestKit.doCommit(writeTx.ready()); - // wait to let the shard catch up with purged - await("Close transaction purge leak test.").atMost(5, TimeUnit.SECONDS) + await("wait for purges to settle").atMost(5, TimeUnit.SECONDS) .pollInterval(500, TimeUnit.MILLISECONDS) .untilAsserted(() -> { final var localShard = leaderDistributedDataStore.getActorUtils().findLocalShard("cars") @@ -466,17 +459,11 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { final var clientMeta = frontendMetadata.getClients().get(0); if (leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) { - assertTellClientMetadata(clientMeta, numCars * 2 + 1); + assertTellClientMetadata(clientMeta, numCars * 2); } else { assertAskClientMetadata(clientMeta); } }); - - try (var tx = txChain.newReadOnlyTransaction()) { - final var body = tx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS).orElseThrow().body(); - assertThat(body, instanceOf(Collection.class)); - assertEquals(numCars, ((Collection) body).size()); - } } @Test diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/SkipTransactionsPayloadTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/SkipTransactionsPayloadTest.java new file mode 100644 index 0000000000..44012d81d6 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/SkipTransactionsPayloadTest.java @@ -0,0 +1,17 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.persisted; + +import org.opendaylight.controller.cluster.datastore.utils.MutableUnsignedLongSet; + +public class SkipTransactionsPayloadTest extends AbstractIdentifiablePayloadTest { + @Override + SkipTransactionsPayload object() { + return SkipTransactionsPayload.create(nextHistoryId(), MutableUnsignedLongSet.of(42).immutableCopy(), 512); + } +} -- 2.36.6