X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FAbstractFrontendHistory.java;h=31bf000ac2123a0bcc0c573fbedc84fba7247bc3;hp=1adca56af2680a32a5c4f68afe39bc1d0f79533d;hb=HEAD;hpb=db9a673c114febc785fbd324947ac2c3e3095d06 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java index 1adca56af2..d00db5757e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java @@ -7,22 +7,37 @@ */ package org.opendaylight.controller.cluster.datastore; -import com.google.common.base.Preconditions; -import com.google.common.base.Ticker; +import static java.util.Objects.requireNonNull; + +import com.google.common.base.MoreObjects; +import com.google.common.collect.ImmutableMap; +import com.google.common.primitives.UnsignedLong; +import java.util.ArrayList; import java.util.HashMap; import java.util.Map; import java.util.Optional; -import javax.annotation.Nullable; +import java.util.SortedSet; +import org.eclipse.jdt.annotation.Nullable; +import org.opendaylight.controller.cluster.access.commands.AbstractReadTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.ClosedTransactionException; import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.DeadTransactionException; +import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest; +import org.opendaylight.controller.cluster.access.commands.LocalHistorySuccess; import org.opendaylight.controller.cluster.access.commands.OutOfOrderRequestException; +import org.opendaylight.controller.cluster.access.commands.SkipTransactionsRequest; +import org.opendaylight.controller.cluster.access.commands.SkipTransactionsResponse; +import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest; +import org.opendaylight.controller.cluster.access.commands.TransactionPurgeResponse; import org.opendaylight.controller.cluster.access.commands.TransactionRequest; import org.opendaylight.controller.cluster.access.commands.TransactionSuccess; import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope; import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.utils.MutableUnsignedLongSet; import org.opendaylight.yangtools.concepts.Identifiable; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; +import org.opendaylight.yangtools.yang.data.tree.api.DataTreeModification; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,15 +49,24 @@ import org.slf4j.LoggerFactory; */ abstract class AbstractFrontendHistory implements Identifiable { private static final Logger LOG = LoggerFactory.getLogger(AbstractFrontendHistory.class); - private static final OutOfOrderRequestException UNSEQUENCED_START = new OutOfOrderRequestException(0); private final Map transactions = new HashMap<>(); + private final MutableUnsignedLongSet purgedTransactions; private final String persistenceId; - private final Ticker ticker; + private final ShardDataTree tree; + + /** + * Transactions closed by the previous leader. Boolean indicates whether the transaction was committed (true) or + * aborted (false). We only ever shrink these. + */ + private Map closedTransactions; - AbstractFrontendHistory(final String persistenceId, final Ticker ticker) { - this.persistenceId = Preconditions.checkNotNull(persistenceId); - this.ticker = Preconditions.checkNotNull(ticker); + AbstractFrontendHistory(final String persistenceId, final ShardDataTree tree, + final Map closedTransactions, final MutableUnsignedLongSet purgedTransactions) { + this.persistenceId = requireNonNull(persistenceId); + this.tree = requireNonNull(tree); + this.closedTransactions = requireNonNull(closedTransactions); + this.purgedTransactions = requireNonNull(purgedTransactions); } final String persistenceId() { @@ -50,36 +74,46 @@ abstract class AbstractFrontendHistory implements Identifiable handleTransactionRequest(final TransactionRequest request, final RequestEnvelope envelope, final long now) throws RequestException { - - // FIXME: handle purging of transactions + if (request instanceof TransactionPurgeRequest purgeRequest) { + return handleTransactionPurgeRequest(purgeRequest, envelope, now); + } else if (request instanceof SkipTransactionsRequest skipRequest) { + return handleSkipTransactionsRequest(skipRequest, envelope, now); + } final TransactionIdentifier id = request.getTarget(); + final long txidBits = id.getTransactionId(); + if (purgedTransactions.contains(txidBits)) { + LOG.warn("{}: Request {} is contained purged transactions {}", persistenceId, request, purgedTransactions); + throw new DeadTransactionException(purgedTransactions.toRangeSet()); + } + + final Boolean closed = closedTransactions.get(UnsignedLong.fromLongBits(txidBits)); + if (closed != null) { + final boolean successful = closed; + LOG.debug("{}: Request {} refers to a {} transaction", persistenceId, request, successful ? "successful" + : "failed"); + throw new ClosedTransactionException(successful); + } + FrontendTransaction tx = transactions.get(id); if (tx == null) { // The transaction does not exist and we are about to create it, check sequence number if (request.getSequence() != 0) { - LOG.debug("{}: no transaction state present, unexpected request {}", persistenceId(), request); - throw UNSEQUENCED_START; - } - - if (request instanceof CommitLocalTransactionRequest) { - tx = createReadyTransaction(id, ((CommitLocalTransactionRequest) request).getModification()); - LOG.debug("{}: allocated new ready transaction {}", persistenceId(), id); - } else { - tx = createOpenTransaction(id); - LOG.debug("{}: allocated new open transaction {}", persistenceId(), id); + LOG.warn("{}: no transaction state present, unexpected request {}", persistenceId(), request); + throw new OutOfOrderRequestException(0); } + tx = createTransaction(request, id); transactions.put(id, tx); - } else { + } else if (!(request instanceof IncrementTransactionSequenceRequest)) { final Optional> maybeReplay = tx.replaySequence(request.getSequence()); if (maybeReplay.isPresent()) { - final TransactionSuccess replay = maybeReplay.get(); + final TransactionSuccess replay = maybeReplay.orElseThrow(); LOG.debug("{}: envelope {} replaying response {}", persistenceId(), envelope, replay); return replay; } @@ -88,10 +122,142 @@ abstract class AbstractFrontendHistory implements Identifiable { + closedTransactions.remove(ul); + if (closedTransactions.isEmpty()) { + closedTransactions = ImmutableMap.of(); + } + + purgedTransactions.add(txidBits); + LOG.debug("{}: finished purging inherited transaction {}", persistenceId(), id); + envelope.sendSuccess(new TransactionPurgeResponse(id, request.getSequence()), readTime() - now); + }); + return null; + } + + final FrontendTransaction tx = transactions.get(id); + if (tx == null) { + // This should never happen because the purge callback removes the transaction and puts it into + // purged transactions in one go. If it does, we warn about the situation and + LOG.warn("{}: transaction {} not tracked in {}, but not present in active transactions", persistenceId, + id, purgedTransactions); + purgedTransactions.add(txidBits); + return new TransactionPurgeResponse(id, request.getSequence()); + } + + tree.purgeTransaction(id, () -> { + purgedTransactions.add(txidBits); + transactions.remove(id); + LOG.debug("{}: finished purging transaction {}", persistenceId(), id); + envelope.sendSuccess(new TransactionPurgeResponse(id, request.getSequence()), readTime() - now); + }); + + return null; + } + + private SkipTransactionsResponse handleSkipTransactionsRequest(final SkipTransactionsRequest request, + final RequestEnvelope envelope, final long now) { + final var first = request.getTarget(); + final var others = request.getOthers(); + final var ids = new ArrayList(others.size() + 1); + ids.add(UnsignedLong.fromLongBits(first.getTransactionId())); + ids.addAll(others); + + final var it = ids.iterator(); + while (it.hasNext()) { + final var id = it.next(); + final long bits = id.longValue(); + if (purgedTransactions.contains(bits)) { + LOG.warn("{}: history {} tracks {} as purged", persistenceId(), getIdentifier(), id); + it.remove(); + } else if (transactions.containsKey(new TransactionIdentifier(getIdentifier(), bits))) { + LOG.warn("{}: history {} tracks {} as open", persistenceId(), getIdentifier(), id); + it.remove(); + } + } + + if (ids.isEmpty()) { + LOG.debug("{}: history {} completing empty skip request", persistenceId(), getIdentifier()); + return new SkipTransactionsResponse(first, now); + } + + final var transactionIds = MutableUnsignedLongSet.of(ids.stream().mapToLong(UnsignedLong::longValue).toArray()) + .immutableCopy(); + LOG.debug("{}: history {} skipping transactions {}", persistenceId(), getIdentifier(), transactionIds.ranges()); + + tree.skipTransactions(getIdentifier(), transactionIds, () -> { + purgedTransactions.addAll(transactionIds); + envelope.sendSuccess(new TransactionPurgeResponse(first, request.getSequence()), readTime() - now); + }); + return null; + } + + final void destroy(final long sequence, final RequestEnvelope envelope, final long now) { + LOG.debug("{}: closing history {}", persistenceId(), getIdentifier()); + tree.closeTransactionChain(getIdentifier(), + () -> envelope.sendSuccess(new LocalHistorySuccess(getIdentifier(), sequence), readTime() - now)); + } + + final void purge(final long sequence, final RequestEnvelope envelope, final long now) { + LOG.debug("{}: purging history {}", persistenceId(), getIdentifier()); + tree.purgeTransactionChain(getIdentifier(), + () -> envelope.sendSuccess(new LocalHistorySuccess(getIdentifier(), sequence), readTime() - now)); + } - abstract FrontendTransaction createReadyTransaction(TransactionIdentifier id, DataTreeModification mod) - throws RequestException; + final void retire() { + transactions.values().forEach(FrontendTransaction::retire); + tree.removeTransactionChain(getIdentifier()); + } - abstract ShardDataTreeCohort createReadyCohort(final TransactionIdentifier id, final DataTreeModification mod); + private FrontendTransaction createTransaction(final TransactionRequest request, final TransactionIdentifier id) { + if (request instanceof CommitLocalTransactionRequest commitLocalRequest) { + LOG.debug("{}: allocating new ready transaction {}", persistenceId(), id); + tree.getStats().incrementReadWriteTransactionCount(); + return createReadyTransaction(id, commitLocalRequest.getModification()); + } + if (request instanceof AbstractReadTransactionRequest readTxRequest && readTxRequest.isSnapshotOnly()) { + LOG.debug("{}: allocating new open snapshot {}", persistenceId(), id); + tree.getStats().incrementReadOnlyTransactionCount(); + return createOpenSnapshot(id); + } + + LOG.debug("{}: allocating new open transaction {}", persistenceId(), id); + tree.getStats().incrementReadWriteTransactionCount(); + return createOpenTransaction(id); + } + + abstract FrontendTransaction createOpenSnapshot(TransactionIdentifier id); + + abstract FrontendTransaction createOpenTransaction(TransactionIdentifier id); + + abstract FrontendTransaction createReadyTransaction(TransactionIdentifier id, DataTreeModification mod); + + abstract ShardDataTreeCohort createFailedCohort(TransactionIdentifier id, DataTreeModification mod, + Exception failure); + + abstract ShardDataTreeCohort createReadyCohort(TransactionIdentifier id, DataTreeModification mod, + Optional> participatingShardNames); + + @Override + public final String toString() { + return MoreObjects.toStringHelper(this).omitNullValues() + .add("identifier", getIdentifier()) + .add("persistenceId", persistenceId) + .add("transactions", transactions) + .toString(); + } }