--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import akka.actor.ActorRef;
+import com.google.common.annotations.Beta;
+import com.google.common.base.MoreObjects.ToStringHelper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.primitives.UnsignedLong;
+import java.util.Collection;
+import java.util.List;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+/**
+ * Request to skip a number of {@link TransactionIdentifier}s within a {code local history}. This request is essentially
+ * equivalent to {@link TransactionPurgeRequest} for {@link #getTarget()}, but also carries additional sibling
+ * {@link TransactionIdentifier}s in {@link #getOthers()}.
+ *
+ * <p>
+ * This request is sent by the frontend to inform the backend that a set of {@link TransactionIdentifier}s are
+ * explicitly retired and are guaranteed to never be used by the frontend.
+ */
+@Beta
+public final class SkipTransactionsRequest extends TransactionRequest<SkipTransactionsRequest> {
+ private static final long serialVersionUID = 1L;
+
+ // Note: UnsignedLong is arbitrary, yang.common.Uint64 would work just as well, we really want an immutable
+ // List<long>, though.
+ private final @NonNull ImmutableList<UnsignedLong> others;
+
+ public SkipTransactionsRequest(final TransactionIdentifier target, final long sequence,
+ final ActorRef replyTo, final Collection<UnsignedLong> others) {
+ super(target, sequence, replyTo);
+ this.others = ImmutableList.copyOf(others);
+ }
+
+ /**
+ * Return this {@link #getTarget()}s sibling {@link TransactionIdentifier}s.
+ *
+ * @return Siblings values of {@link TransactionIdentifier#getTransactionId()}
+ */
+ public List<UnsignedLong> getOthers() {
+ return others;
+ }
+
+ @Override
+ protected SkipTransactionsRequestV1 externalizableProxy(final ABIVersion version) {
+ return new SkipTransactionsRequestV1(this);
+ }
+
+ @Override
+ protected SkipTransactionsRequest cloneAsVersion(final ABIVersion version) {
+ return this;
+ }
+
+ @Override
+ protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
+ final var helper = super.addToStringAttributes(toStringHelper);
+ if (!others.isEmpty()) {
+ helper.add("others", others);
+ }
+ return helper;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import akka.actor.ActorRef;
+import com.google.common.collect.ImmutableList;
+import com.google.common.primitives.UnsignedLong;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.List;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.yangtools.concepts.WritableObjects;
+
+/**
+ * Externalizable proxy for use with {@link SkipTransactionsRequest}. It implements the initial
+ * (Phosphorus SR1) serialization format.
+ */
+final class SkipTransactionsRequestV1 extends AbstractTransactionRequestProxy<SkipTransactionsRequest> {
+ private List<UnsignedLong> others;
+
+ // checkstyle flags the public modifier as redundant however it is explicitly needed for Java serialization to
+ // be able to create instances via reflection.
+ @SuppressWarnings("checkstyle:RedundantModifier")
+ public SkipTransactionsRequestV1() {
+ // For Externalizable
+ }
+
+ SkipTransactionsRequestV1(final SkipTransactionsRequest request) {
+ super(request);
+ others = request.getOthers();
+ }
+
+ @Override
+ public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
+ super.readExternal(in);
+
+ final int size = in.readInt();
+ final var builder = ImmutableList.<UnsignedLong>builderWithExpectedSize(size);
+ int idx;
+ if (size % 2 != 0) {
+ builder.add(UnsignedLong.fromLongBits(WritableObjects.readLong(in)));
+ idx = 1;
+ } else {
+ idx = 0;
+ }
+ for (; idx < size; idx += 2) {
+ final byte hdr = WritableObjects.readLongHeader(in);
+ builder.add(UnsignedLong.fromLongBits(WritableObjects.readFirstLong(in, hdr)));
+ builder.add(UnsignedLong.fromLongBits(WritableObjects.readSecondLong(in, hdr)));
+ }
+ others = builder.build();
+ }
+
+ @Override
+ public void writeExternal(final ObjectOutput out) throws IOException {
+ super.writeExternal(out);
+
+ final int size = others.size();
+ out.writeInt(size);
+
+ int idx;
+ if (size % 2 != 0) {
+ WritableObjects.writeLong(out, others.get(0).longValue());
+ idx = 1;
+ } else {
+ idx = 0;
+ }
+ for (; idx < size; idx += 2) {
+ WritableObjects.writeLongs(out, others.get(idx).longValue(), others.get(idx + 1).longValue());
+ }
+ }
+
+ @Override
+ protected SkipTransactionsRequest createRequest(final TransactionIdentifier target, final long sequence,
+ final ActorRef replyToActor) {
+ return new SkipTransactionsRequest(target, sequence, replyToActor, others);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import com.google.common.annotations.Beta;
+import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+/**
+ * Successful reply to a {@link SkipTransactionsRequest}.
+ */
+@Beta
+public final class SkipTransactionsResponse extends TransactionSuccess<SkipTransactionsResponse> {
+ private static final long serialVersionUID = 1L;
+
+ public SkipTransactionsResponse(final TransactionIdentifier identifier, final long sequence) {
+ super(identifier, sequence);
+ }
+
+ @Override
+ protected AbstractTransactionSuccessProxy<SkipTransactionsResponse> externalizableProxy(
+ final ABIVersion version) {
+ return new SkipTransactionsResponseProxyV1(this);
+ }
+
+ @Override
+ protected SkipTransactionsResponse cloneAsVersion(final ABIVersion version) {
+ return this;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+/**
+ * Externalizable proxy for use with {@link SkipTransactionsResponse}. It implements the initial (Phosphorus SR1)
+ * serialization format.
+ */
+final class SkipTransactionsResponseProxyV1 extends AbstractTransactionSuccessProxy<SkipTransactionsResponse> {
+ private static final long serialVersionUID = 1L;
+
+ // checkstyle flags the public modifier as redundant however it is explicitly needed for Java serialization to
+ // be able to create instances via reflection.
+ @SuppressWarnings("checkstyle:RedundantModifier")
+ public SkipTransactionsResponseProxyV1() {
+ // For Externalizable
+ }
+
+ SkipTransactionsResponseProxyV1(final SkipTransactionsResponse success) {
+ super(success);
+ }
+
+ @Override
+ protected SkipTransactionsResponse createSuccess(final TransactionIdentifier target, final long sequence) {
+ return new SkipTransactionsResponse(target, sequence);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.primitives.UnsignedLong;
+import java.util.List;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.access.ABIVersion;
+
+public class SkipTransactionsRequestTest extends AbstractTransactionRequestTest<SkipTransactionsRequest> {
+ private static final SkipTransactionsRequest OBJECT = new SkipTransactionsRequest(
+ TRANSACTION_IDENTIFIER, 0, ACTOR_REF, List.of(UnsignedLong.ONE));
+
+ @Override
+ protected SkipTransactionsRequest object() {
+ return OBJECT;
+ }
+
+ @Test
+ public void cloneAsVersionTest() {
+ final SkipTransactionsRequest clone = OBJECT.cloneAsVersion(ABIVersion.BORON);
+ assertEquals(OBJECT, clone);
+ }
+
+ @Override
+ protected void doAdditionalAssertions(final Object deserialize) {
+ assertThat(deserialize, instanceOf(SkipTransactionsRequest.class));
+ assertEquals(OBJECT.getReplyTo(), ((SkipTransactionsRequest) deserialize).getReplyTo());
+ }
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+import org.opendaylight.controller.cluster.access.ABIVersion;
+
+public class SkipTransactionsResponseTest extends AbstractTransactionSuccessTest<SkipTransactionsResponse> {
+ private static final SkipTransactionsResponse OBJECT = new SkipTransactionsResponse(
+ TRANSACTION_IDENTIFIER, 0);
+
+ @Override
+ protected SkipTransactionsResponse object() {
+ return OBJECT;
+ }
+
+ @Test
+ public void cloneAsVersionTest() {
+ final SkipTransactionsResponse clone = OBJECT.cloneAsVersion(ABIVersion.BORON);
+ assertEquals(OBJECT, clone);
+ }
+
+ @Override
+ protected void doAdditionalAssertions(final Object deserialize) {
+ assertThat(deserialize, instanceOf(SkipTransactionsResponse.class));
+ }
+}
\ No newline at end of file
*
* <p>
* Note that unlike {@link #sendRequest(Request, Consumer)}, this method does not exert backpressure, hence it
- * should never be called from an application thread.
+ * should never be called from an application thread and serves mostly for moving requests between queues.
*
* @param request Request to send
* @param callback Callback to invoke
import com.google.common.annotations.Beta;
import com.google.common.base.MoreObjects;
-import java.util.Collection;
+import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.eclipse.jdt.annotation.NonNull;
}
private boolean commonAbort() {
- final Collection<T> toClose = ensureClosed();
+ final Map<Long, T> toClose = ensureClosed();
if (toClose == null) {
return false;
}
- toClose.forEach(AbstractProxyTransaction::abort);
+ toClose.values().forEach(AbstractProxyTransaction::abort);
+ parent.onTransactionShardsBound(transactionId, toClose.keySet());
return true;
}
* Make sure this snapshot is closed. If it became closed as the effect of this call, return a collection of
* {@link AbstractProxyTransaction} handles which need to be closed, too.
*
- * @return null if this snapshot has already been closed, otherwise a collection of proxies, which need to be
+ * @return null if this snapshot has already been closed, otherwise a State with of proxies, which need to be
* closed, too.
*/
- final @Nullable Collection<T> ensureClosed() {
+ final @Nullable Map<Long, T> ensureClosed() {
// volatile read and a conditional CAS. This ends up being better in the typical case when we are invoked more
// than once (see ClientBackedTransaction) than performing a STATE_UPDATER.getAndSet().
final State<T> local = state;
- return local != null && STATE_UPDATER.compareAndSet(this, local, null) ? local.values() : null;
+ return local != null && STATE_UPDATER.compareAndSet(this, local, null) ? local : null;
}
final T ensureProxy(final YangInstanceIdentifier path) {
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@Holding("this")
abstract ClientTransaction doCreateTransaction();
+ /**
+ * Callback invoked from {@link AbstractClientHandle}'s lifecycle to inform that a particular transaction is
+ * completing with a set of participating shards.
+ *
+ * @param txId Transaction identifier
+ * @param participatingShards Participating shard cookies
+ */
+ final void onTransactionShardsBound(final TransactionIdentifier txId, final Set<Long> participatingShards) {
+ // Guard against startReconnect() kicking in. It is okay to connect new participants concurrently, as those
+ // will not see the holes caused by this.
+ final long stamp = lock.readLock();
+ try {
+ for (var entry : histories.entrySet()) {
+ if (!participatingShards.contains(entry.getKey())) {
+ entry.getValue().skipTransaction(txId);
+ }
+ }
+ } finally {
+ lock.unlockRead(stamp);
+ }
+ }
+
/**
* Callback invoked from {@link ClientTransaction} when a child transaction readied for submission.
*
import com.google.common.annotations.Beta;
import com.google.common.util.concurrent.FluentFuture;
import java.util.Collection;
+import java.util.Map;
import java.util.Optional;
import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
}
public DOMStoreThreePhaseCommitCohort ready() {
- final Collection<AbstractProxyTransaction> toReady = ensureClosed();
- checkState(toReady != null, "Attempted to submit a closed transaction %s", this);
+ final Map<Long, AbstractProxyTransaction> participants = ensureClosed();
+ checkState(participants != null, "Attempted to submit a closed transaction %s", this);
+ final Collection<AbstractProxyTransaction> toReady = participants.values();
toReady.forEach(AbstractProxyTransaction::seal);
+
+ final TransactionIdentifier txId = getIdentifier();
+ final AbstractClientHistory parent = parent();
+ parent.onTransactionShardsBound(txId, participants.keySet());
+
final AbstractTransactionCommitCohort cohort;
switch (toReady.size()) {
case 0:
- cohort = new EmptyTransactionCommitCohort(parent(), getIdentifier());
+ cohort = new EmptyTransactionCommitCohort(parent, txId);
break;
case 1:
- cohort = new DirectTransactionCommitCohort(parent(), getIdentifier(), toReady.iterator().next());
+ cohort = new DirectTransactionCommitCohort(parent, txId, toReady.iterator().next());
break;
default:
- cohort = new ClientTransactionCommitCohort(parent(), getIdentifier(), toReady);
+ cohort = new ClientTransactionCommitCohort(parent, txId, toReady);
break;
}
- return parent().onTransactionReady(this, cohort);
+ return parent.onTransactionReady(this, cohort);
}
@Override
import static java.util.Objects.requireNonNull;
import akka.actor.ActorRef;
+import com.google.common.collect.ImmutableList;
+import com.google.common.primitives.UnsignedLong;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.opendaylight.controller.cluster.access.commands.DestroyLocalHistoryRequest;
import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
import org.opendaylight.controller.cluster.access.commands.PurgeLocalHistoryRequest;
+import org.opendaylight.controller.cluster.access.commands.SkipTransactionsRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
import org.opendaylight.controller.cluster.access.concepts.Request;
t.replayMessages(successor, previousEntries);
}
+ // Forward any skipped transactions
+ final var local = skippedTransactions;
+ if (local != null) {
+ LOG.debug("{} forwarding skipped transactions towards successor {}", identifier, successor);
+ successor.skipTransactions(local);
+ skippedTransactions = null;
+ }
+
// Now look for any finalizing messages
it = previousEntries.iterator();
while (it.hasNext()) {
@GuardedBy("lock")
private ProxyHistory successor;
+ // List of transaction identifiers which were allocated by our parent history, but did not touch our shard. Each of
+ // these represents a hole in otherwise-contiguous allocation of transactionIds. These holes are problematic, as
+ // each of them prevents LeaderFrontendState.purgedHistories from coalescing, leading to a gradual heap exhaustion.
+ //
+ // <p>
+ // We keep these in an ArrayList for fast insertion, as that happens when we are otherwise idle. We translate these
+ // into purge requests when:
+ // - we are about to allocate a new transaction
+ // - we get a successor proxy
+ // - the list grows unreasonably long
+ //
+ // TODO: we are tracking entire TransactionIdentifiers, but really only need to track the longs. Do that once we
+ // have a {@code List<long>}.
+ // FIXME: this is not tuneable, but perhaps should be
+ // FIXME: default value deserves some explanation -- this affects depth of an RB Tree on the receiving end.
+ private static final int PURGE_SKIPPED_TXID_THRESHOLD = 256;
+
+ @GuardedBy("lock")
+ private volatile List<TransactionIdentifier> skippedTransactions;
+
private ProxyHistory(final AbstractClientHistory parent,
final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
this.parent = requireNonNull(parent);
}
}
+ final void skipTransaction(final TransactionIdentifier txId) {
+ lock.lock();
+ try {
+ if (successor != null) {
+ successor.skipTransaction(txId);
+ return;
+ }
+
+ var local = skippedTransactions;
+ if (local == null) {
+ skippedTransactions = local = new ArrayList<>();
+ }
+ local.add(txId);
+ LOG.debug("Recorded skipped transaction {}", txId);
+ skipIfNeeded(local);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Holding("lock")
+ private void skipIfNeeded(final List<TransactionIdentifier> current) {
+ if (current.size() >= PURGE_SKIPPED_TXID_THRESHOLD) {
+ skippedTransactions = null;
+ doSkipTransactions(current);
+ }
+ }
+
+ private void skipTransactions(final List<TransactionIdentifier> toSkip) {
+ lock.lock();
+ try {
+ if (successor != null) {
+ successor.skipTransactions(toSkip);
+ return;
+ }
+
+ var local = skippedTransactions;
+ if (local != null) {
+ local.addAll(toSkip);
+ } else {
+ skippedTransactions = local = toSkip;
+ }
+ skipIfNeeded(local);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private void skipTransactions() {
+ var local = skippedTransactions;
+ if (local != null) {
+ lock.lock();
+ try {
+ local = skippedTransactions;
+ if (local != null && successor == null) {
+ skippedTransactions = null;
+ doSkipTransactions(local);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+
+ @Holding("lock")
+ private void doSkipTransactions(final List<TransactionIdentifier> toSkip) {
+ final var txIds = toSkip.stream()
+ .mapToLong(TransactionIdentifier::getTransactionId)
+ .distinct()
+ .sorted()
+ .mapToObj(UnsignedLong::fromLongBits)
+ .collect(ImmutableList.toImmutableList());
+
+ LOG.debug("Proxy {} skipping transactions {}", this, txIds);
+ connection.enqueueRequest(new SkipTransactionsRequest(new TransactionIdentifier(identifier,
+ txIds.get(0).longValue()), 0, localActor(),txIds.subList(1, txIds.size())), resp -> {
+ LOG.debug("Proxy {} confirmed transaction skip", this);
+ }, connection.currentTime());
+ }
+
final void abortTransaction(final AbstractProxyTransaction tx) {
lock.lock();
try {
final void enqueueRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
final long enqueuedTicks) {
+ skipTransactions();
connection.enqueueRequest(request, callback, enqueuedTicks);
}
final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
+ skipTransactions();
connection.sendRequest(request, callback);
}
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableMap;
import com.google.common.primitives.UnsignedLong;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest;
import org.opendaylight.controller.cluster.access.commands.LocalHistorySuccess;
import org.opendaylight.controller.cluster.access.commands.OutOfOrderRequestException;
+import org.opendaylight.controller.cluster.access.commands.SkipTransactionsRequest;
+import org.opendaylight.controller.cluster.access.commands.SkipTransactionsResponse;
import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionPurgeResponse;
import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
final RequestEnvelope envelope, final long now) throws RequestException {
if (request instanceof TransactionPurgeRequest) {
return handleTransactionPurgeRequest((TransactionPurgeRequest) request, envelope, now);
+ } else if (request instanceof SkipTransactionsRequest) {
+ return handleSkipTransactionsRequest((SkipTransactionsRequest) request, envelope, now);
}
final TransactionIdentifier id = request.getTarget();
return null;
}
+ private SkipTransactionsResponse handleSkipTransactionsRequest(final SkipTransactionsRequest request,
+ final RequestEnvelope envelope, final long now) throws RequestException {
+ final var first = request.getTarget();
+ final var others = request.getOthers();
+ final var ids = new ArrayList<UnsignedLong>(others.size() + 1);
+ ids.add(UnsignedLong.fromLongBits(first.getTransactionId()));
+ ids.addAll(others);
+
+ final var it = ids.iterator();
+ while (it.hasNext()) {
+ final var id = it.next();
+ final long bits = id.longValue();
+ if (purgedTransactions.contains(bits)) {
+ LOG.warn("{}: history {} tracks {} as purged", persistenceId(), getIdentifier(), id);
+ it.remove();
+ } else if (transactions.containsKey(new TransactionIdentifier(getIdentifier(), bits))) {
+ LOG.warn("{}: history {} tracks {} as open", persistenceId(), getIdentifier(), id);
+ it.remove();
+ }
+ }
+
+ if (ids.isEmpty()) {
+ LOG.debug("{}: history {} completing empty skip request", persistenceId(), getIdentifier());
+ return new SkipTransactionsResponse(first, now);
+ }
+
+ final var transactionIds = MutableUnsignedLongSet.of(ids.stream().mapToLong(UnsignedLong::longValue).toArray())
+ .immutableCopy();
+ LOG.debug("{}: history {} skipping transactions {}", persistenceId(), getIdentifier(), transactionIds.ranges());
+
+ tree.skipTransactions(getIdentifier(), transactionIds, () -> {
+ purgedTransactions.addAll(transactionIds);
+ envelope.sendSuccess(new TransactionPurgeResponse(first, request.getSequence()), readTime() - now);
+ });
+ return null;
+ }
+
final void destroy(final long sequence, final RequestEnvelope envelope, final long now) {
LOG.debug("{}: closing history {}", persistenceId(), getIdentifier());
tree.closeTransactionChain(getIdentifier(),
// No-op
}
+ @Override
+ void onTransactionsSkipped(final LocalHistoryIdentifier historyId, final ImmutableUnsignedLongSet txIds) {
+ // No-op
+ }
+
@Override
LeaderFrontendState toLeaderState(final Shard shard) {
return new LeaderFrontendState.Disabled(shard.persistenceId(), getIdentifier(), shard.getDataStore());
}
}
+ @Override
+ void onTransactionsSkipped(final LocalHistoryIdentifier historyId, final ImmutableUnsignedLongSet txIds) {
+ final FrontendHistoryMetadataBuilder history = getHistory(historyId);
+ if (history != null) {
+ history.onTransactionsSkipped(txIds);
+ LOG.debug("{}: History {} skipped transactions {}", shardName(), historyId, txIds);
+ } else {
+ LOG.warn("{}: Unknown history {} for skipped transactions, ignoring", shardName(), historyId);
+ }
+ }
+
@Override
LeaderFrontendState toLeaderState(final Shard shard) {
// Note: we have to make sure to *copy* all current state and not leak any views, otherwise leader/follower
}
private FrontendHistoryMetadataBuilder getHistory(final TransactionIdentifier txId) {
- LocalHistoryIdentifier historyId = txId.getHistoryId();
+ return getHistory(txId.getHistoryId());
+ }
+
+ private FrontendHistoryMetadataBuilder getHistory(final LocalHistoryIdentifier historyId) {
+ final LocalHistoryIdentifier local;
if (historyId.getHistoryId() == 0 && historyId.getCookie() != 0) {
// We are pre-creating the history for free-standing transactions with a zero cookie, hence our lookup
// needs to account for that.
LOG.debug("{}: looking up {} instead of {}", shardName(), standaloneId, historyId);
- historyId = standaloneId;
+ local = standaloneId;
+ } else {
+ local = historyId;
}
- return currentHistories.get(historyId);
+ return currentHistories.get(local);
}
private LocalHistoryIdentifier standaloneHistoryId() {
static FrontendClientMetadataBuilder of(final String shardName, final FrontendClientMetadata meta) {
// Completely empty histories imply disabled state, as otherwise we'd have a record of the single history --
// either purged or active
- return meta.getCurrentHistories().isEmpty() && meta.getPurgedHistories().isEmpty()
+ return meta.getCurrentHistories().isEmpty() && meta.getPurgedHistories().isEmpty()
? new Disabled(shardName, meta.getIdentifier()) : new Enabled(shardName, meta);
}
abstract void onTransactionPurged(TransactionIdentifier txId);
+ abstract void onTransactionsSkipped(LocalHistoryIdentifier historyId, ImmutableUnsignedLongSet txIds);
+
/**
* Transform frontend metadata for a particular client into its {@link LeaderFrontendState} counterpart.
*
import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.persisted.FrontendHistoryMetadata;
+import org.opendaylight.controller.cluster.datastore.utils.ImmutableUnsignedLongSet;
import org.opendaylight.controller.cluster.datastore.utils.MutableUnsignedLongSet;
import org.opendaylight.controller.cluster.datastore.utils.UnsignedLongBitmap;
import org.opendaylight.yangtools.concepts.Builder;
purgedTransactions.add(txidBits);
}
+ void onTransactionsSkipped(final ImmutableUnsignedLongSet txIds) {
+ purgedTransactions.addAll(txIds);
+ }
+
/**
* Transform frontend metadata for a particular client history into its {@link LocalFrontendHistory} counterpart.
*
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.persisted.FrontendClientMetadata;
import org.opendaylight.controller.cluster.datastore.persisted.FrontendShardDataTreeSnapshotMetadata;
+import org.opendaylight.controller.cluster.datastore.utils.ImmutableUnsignedLongSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
ensureClient(txId.getHistoryId().getClientId()).onTransactionPurged(txId);
}
+ @Override
+ void onTransactionsSkipped(final LocalHistoryIdentifier historyId, final ImmutableUnsignedLongSet txIds) {
+ ensureClient(historyId.getClientId()).onTransactionsSkipped(historyId, txIds);
+ }
+
/**
* Transform frontend metadata into an active leader state map.
*
import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshotMetadata;
import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
+import org.opendaylight.controller.cluster.datastore.persisted.SkipTransactionsPayload;
import org.opendaylight.controller.cluster.datastore.utils.DataTreeModificationOutput;
+import org.opendaylight.controller.cluster.datastore.utils.ImmutableUnsignedLongSet;
import org.opendaylight.controller.cluster.datastore.utils.PruningDataTreeModification;
import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
void updateSchemaContext(final @NonNull EffectiveModelContext newSchemaContext) {
dataTree.setEffectiveModelContext(newSchemaContext);
- this.schemaContext = newSchemaContext;
- this.dataSchemaContext = DataSchemaContextTree.from(newSchemaContext);
+ schemaContext = newSchemaContext;
+ dataSchemaContext = DataSchemaContextTree.from(newSchemaContext);
}
void resetTransactionBatch() {
allMetadataClosedLocalHistory(((CloseLocalHistoryPayload) payload).getIdentifier());
} else if (payload instanceof PurgeLocalHistoryPayload) {
allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier());
+ } else if (payload instanceof SkipTransactionsPayload) {
+ allMetadataSkipTransactions((SkipTransactionsPayload) payload);
} else {
LOG.debug("{}: ignoring unhandled payload {}", logContext, payload);
}
payloadReplicationComplete((PurgeLocalHistoryPayload)payload);
}
allMetadataPurgedLocalHistory(((PurgeLocalHistoryPayload) payload).getIdentifier());
+ } else if (payload instanceof SkipTransactionsPayload) {
+ if (identifier != null) {
+ payloadReplicationComplete((SkipTransactionsPayload)payload);
+ }
+ allMetadataSkipTransactions((SkipTransactionsPayload) payload);
} else {
LOG.warn("{}: ignoring unhandled identifier {} payload {}", logContext, identifier, payload);
}
}
}
+ private void allMetadataSkipTransactions(final SkipTransactionsPayload payload) {
+ final var historyId = payload.getIdentifier();
+ final var txIds = payload.getTransactionIds();
+ for (ShardDataTreeMetadata<?> m : metadata) {
+ m.onTransactionsSkipped(historyId, txIds);
+ }
+ }
+
/**
* Create a transaction chain for specified history. Unlike {@link #ensureTransactionChain(LocalHistoryIdentifier)},
* this method is used for re-establishing state when we are taking over
id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
}
- Optional<DataTreeCandidate> readCurrentData() {
+ final void skipTransactions(final LocalHistoryIdentifier id, final ImmutableUnsignedLongSet transactionIds,
+ final Runnable callback) {
+ final ShardDataTreeTransactionChain chain = transactionChains.get(id);
+ if (chain == null) {
+ LOG.debug("{}: Skipping on non-existent transaction chain {}", logContext, id);
+ if (callback != null) {
+ callback.run();
+ }
+ return;
+ }
+
+ replicatePayload(id, SkipTransactionsPayload.create(id, transactionIds,
+ shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
+ }
+
+ final Optional<DataTreeCandidate> readCurrentData() {
return dataTree.takeSnapshot().readNode(YangInstanceIdentifier.empty())
.map(state -> DataTreeCandidates.fromNormalizedNode(YangInstanceIdentifier.empty(), state));
}
import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshotMetadata;
+import org.opendaylight.controller.cluster.datastore.utils.ImmutableUnsignedLongSet;
abstract class ShardDataTreeMetadata<T extends ShardDataTreeSnapshotMetadata<T>> {
/**
abstract void onTransactionPurged(TransactionIdentifier txId);
+ abstract void onTransactionsSkipped(LocalHistoryIdentifier historyId, ImmutableUnsignedLongSet txIds);
+
abstract void onHistoryCreated(LocalHistoryIdentifier historyId);
abstract void onHistoryClosed(LocalHistoryIdentifier historyId);
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.persisted;
+
+import static com.google.common.base.Verify.verifyNotNull;
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.io.ByteStreams;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.DataInput;
+import java.io.IOException;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.datastore.utils.ImmutableUnsignedLongSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Payload persisted when a local history is instructed some transaction identifiers, i.e. the frontend has used them
+ * for other purposes. It contains a {@link LocalHistoryIdentifier} and a list of transaction identifiers within that
+ * local history.
+ */
+public final class SkipTransactionsPayload extends AbstractIdentifiablePayload<LocalHistoryIdentifier> {
+ private static final class Proxy extends AbstractProxy<LocalHistoryIdentifier> {
+ private static final long serialVersionUID = 1L;
+
+ private ImmutableUnsignedLongSet transactionIds;
+
+ // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+ // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+ @SuppressWarnings("checkstyle:RedundantModifier")
+ public Proxy() {
+ // For Externalizable
+ }
+
+ Proxy(final byte[] serialized) {
+ super(serialized);
+ }
+
+ @Override
+ protected LocalHistoryIdentifier readIdentifier(final DataInput in) throws IOException {
+ final var id = LocalHistoryIdentifier.readFrom(in);
+ transactionIds = ImmutableUnsignedLongSet.readFrom(in);
+ return id;
+ }
+
+ @Override
+ protected SkipTransactionsPayload createObject(final LocalHistoryIdentifier identifier,
+ final byte[] serialized) {
+ return new SkipTransactionsPayload(identifier, serialized, verifyNotNull(transactionIds));
+ }
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(SkipTransactionsPayload.class);
+ private static final long serialVersionUID = 1L;
+
+ @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "Handled via externalizable proxy")
+ private final @NonNull ImmutableUnsignedLongSet transactionIds;
+
+ private SkipTransactionsPayload(final @NonNull LocalHistoryIdentifier historyId,
+ final byte @NonNull [] serialized, final ImmutableUnsignedLongSet transactionIds) {
+ super(historyId, serialized);
+ this.transactionIds = requireNonNull(transactionIds);
+ }
+
+ public static @NonNull SkipTransactionsPayload create(final LocalHistoryIdentifier historyId,
+ final ImmutableUnsignedLongSet transactionIds, final int initialSerializedBufferCapacity) {
+ final var out = ByteStreams.newDataOutput(initialSerializedBufferCapacity);
+ try {
+ historyId.writeTo(out);
+ transactionIds.writeTo(out);
+ } catch (IOException e) {
+ // This should never happen
+ LOG.error("Failed to serialize {} ids {}", historyId, transactionIds, e);
+ throw new RuntimeException("Failed to serialize " + historyId + " ids " + transactionIds, e);
+ }
+
+ return new SkipTransactionsPayload(historyId, out.toByteArray(), transactionIds);
+ }
+
+ public @NonNull ImmutableUnsignedLongSet getTransactionIds() {
+ return transactionIds;
+ }
+
+ @Override
+ protected Proxy externalizableProxy(final byte[] serialized) {
+ return new Proxy(serialized);
+ }
+}
*/
package org.opendaylight.controller.cluster.databroker.actors.dds;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import akka.actor.ActorSystem;
import akka.testkit.TestProbe;
import akka.testkit.javadsl.TestKit;
-import java.util.Collection;
-import java.util.Collections;
+import java.util.List;
+import java.util.Map;
import org.junit.After;
-import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
client.getConnection(0L);
contextProbe.expectMsgClass(ConnectClientRequest.class);
final long sequence = 0L;
- contextProbe.reply(new ConnectClientSuccess(CLIENT_ID, sequence, backendProbe.ref(),
- Collections.emptyList(), dataTree, 3));
+ contextProbe.reply(new ConnectClientSuccess(CLIENT_ID, sequence, backendProbe.ref(), List.of(), dataTree, 3));
final InternalCommand<ShardBackendInfo> command = clientContextProbe.expectMsgClass(InternalCommand.class);
command.execute(client);
//data tree mock
@Test
public void testGetIdentifier() {
- Assert.assertEquals(TRANSACTION_ID, handle.getIdentifier());
+ assertEquals(TRANSACTION_ID, handle.getIdentifier());
}
@Test
handle.abort();
final Envelope<?> envelope = backendProbe.expectMsgClass(Envelope.class);
final AbortLocalTransactionRequest request = (AbortLocalTransactionRequest) envelope.getMessage();
- Assert.assertEquals(TRANSACTION_ID, request.getTarget());
+ assertEquals(TRANSACTION_ID, request.getTarget());
checkClosed();
}
handle.localAbort(new RuntimeException("fail"));
final Envelope<?> envelope = backendProbe.expectMsgClass(Envelope.class);
final AbortLocalTransactionRequest request = (AbortLocalTransactionRequest) envelope.getMessage();
- Assert.assertEquals(TRANSACTION_ID, request.getTarget());
+ assertEquals(TRANSACTION_ID, request.getTarget());
checkClosed();
}
@Test
public void testEnsureClosed() {
doHandleOperation(handle);
- final Collection<AbstractProxyTransaction> transactions = handle.ensureClosed();
- Assert.assertNotNull(transactions);
- Assert.assertEquals(1, transactions.size());
+ final Map<Long, AbstractProxyTransaction> transactions = handle.ensureClosed();
+ assertNotNull(transactions);
+ assertEquals(1, transactions.size());
}
@Test
public void testEnsureProxy() {
final AbstractProxyTransaction expected = mock(AbstractProxyTransaction.class);
final AbstractProxyTransaction proxy = handle.ensureProxy(PATH);
- Assert.assertEquals(0, proxy.getIdentifier().getTransactionId());
+ assertEquals(0, proxy.getIdentifier().getTransactionId());
}
@Test
public void testParent() {
- Assert.assertEquals(parent, handle.parent());
+ assertEquals(parent, handle.parent());
}
protected void checkClosed() throws Exception {
protected <R extends Request<?, R>> R backendRespondToRequest(final Class<R> expectedRequestClass,
final Response<?, ?> response) {
final RequestEnvelope envelope = backendProbe.expectMsgClass(RequestEnvelope.class);
- Assert.assertEquals(expectedRequestClass, envelope.getMessage().getClass());
+ assertEquals(expectedRequestClass, envelope.getMessage().getClass());
final AbstractClientConnection<ShardBackendInfo> connection = client.getConnection(0L);
final long sessionId = envelope.getSessionId();
final long txSequence = envelope.getTxSequence();
}
}
- private void assertAskClientMetadata(final FrontendClientMetadata clientMeta) {
+ private static void assertAskClientMetadata(final FrontendClientMetadata clientMeta) {
// ask based should track no metadata
assertEquals(List.of(), clientMeta.getCurrentHistories());
}
- private void assertTellClientMetadata(final FrontendClientMetadata clientMeta, final long lastPurged) {
+ private static void assertTellClientMetadata(final FrontendClientMetadata clientMeta, final long lastPurged) {
final var iterator = clientMeta.getCurrentHistories().iterator();
var metadata = iterator.next();
while (iterator.hasNext() && metadata.getHistoryId() != 1) {
metadata = iterator.next();
}
- // FIXME: CONTROLLER-1991: remove this assumption
- assumeTrue(false);
-
assertEquals(UnsignedLongBitmap.of(), metadata.getClosedTransactions());
assertEquals("[[0.." + lastPurged + "]]", metadata.getPurgedTransactions().ranges().toString());
}
int numCars = 5;
for (int i = 0; i < numCars; i++) {
- writeTx = txChain.newWriteOnlyTransaction();
- writeTx.close();
+ try (var tx = txChain.newWriteOnlyTransaction()) {
+ // Empty on purpose
+ }
try (var tx = txChain.newReadOnlyTransaction()) {
tx.read(CarsModel.BASE_PATH).get();
}
}
- writeTx = txChain.newWriteOnlyTransaction();
- writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
- writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
- followerTestKit.doCommit(writeTx.ready());
-
// wait to let the shard catch up with purged
- await("Close transaction purge leak test.").atMost(5, TimeUnit.SECONDS)
+ await("wait for purges to settle").atMost(5, TimeUnit.SECONDS)
.pollInterval(500, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
final var localShard = leaderDistributedDataStore.getActorUtils().findLocalShard("cars")
final var clientMeta = frontendMetadata.getClients().get(0);
if (leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) {
- assertTellClientMetadata(clientMeta, numCars * 2 + 1);
+ assertTellClientMetadata(clientMeta, numCars * 2);
} else {
assertAskClientMetadata(clientMeta);
}
});
-
- try (var tx = txChain.newReadOnlyTransaction()) {
- final var body = tx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS).orElseThrow().getValue();
- assertThat(body, instanceOf(Collection.class));
- assertEquals(numCars, ((Collection<?>) body).size());
- }
}
@Test
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.persisted;
+
+import org.opendaylight.controller.cluster.datastore.utils.MutableUnsignedLongSet;
+
+public class SkipTransactionsPayloadTest extends AbstractIdentifiablePayloadTest<SkipTransactionsPayload> {
+ @Override
+ SkipTransactionsPayload object() {
+ return SkipTransactionsPayload.create(nextHistoryId(), MutableUnsignedLongSet.of(42).immutableCopy(), 512);
+ }
+}