X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FAbstractFrontendHistory.java;h=022bb7aa07e2016670d9fc4fd36e006116655322;hb=abeaf223cadd818e2054b516e39c20305ea144b8;hp=e136d50e02fc0c60f1eb3c0a44f824aed0b77615;hpb=127042ea7e148d9dc0282acc3780b4754ca69e12;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java index e136d50e02..022bb7aa07 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java @@ -7,12 +7,12 @@ */ package org.opendaylight.controller.cluster.datastore; +import static java.util.Objects.requireNonNull; + import com.google.common.base.MoreObjects; -import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Range; -import com.google.common.collect.RangeSet; import com.google.common.primitives.UnsignedLong; +import java.util.ArrayList; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -25,6 +25,8 @@ import org.opendaylight.controller.cluster.access.commands.DeadTransactionExcept import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest; import org.opendaylight.controller.cluster.access.commands.LocalHistorySuccess; import org.opendaylight.controller.cluster.access.commands.OutOfOrderRequestException; +import org.opendaylight.controller.cluster.access.commands.SkipTransactionsRequest; +import org.opendaylight.controller.cluster.access.commands.SkipTransactionsResponse; import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest; import org.opendaylight.controller.cluster.access.commands.TransactionPurgeResponse; import org.opendaylight.controller.cluster.access.commands.TransactionRequest; @@ -33,6 +35,7 @@ import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifie import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope; import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.utils.MutableUnsignedLongSet; import org.opendaylight.yangtools.concepts.Identifiable; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; import org.slf4j.Logger; @@ -48,7 +51,7 @@ abstract class AbstractFrontendHistory implements Identifiable transactions = new HashMap<>(); - private final RangeSet purgedTransactions; + private final MutableUnsignedLongSet purgedTransactions; private final String persistenceId; private final ShardDataTree tree; @@ -59,11 +62,11 @@ abstract class AbstractFrontendHistory implements Identifiable closedTransactions; AbstractFrontendHistory(final String persistenceId, final ShardDataTree tree, - final Map closedTransactions, final RangeSet purgedTransactions) { - this.persistenceId = Preconditions.checkNotNull(persistenceId); - this.tree = Preconditions.checkNotNull(tree); - this.closedTransactions = Preconditions.checkNotNull(closedTransactions); - this.purgedTransactions = Preconditions.checkNotNull(purgedTransactions); + final Map closedTransactions, final MutableUnsignedLongSet purgedTransactions) { + this.persistenceId = requireNonNull(persistenceId); + this.tree = requireNonNull(tree); + this.closedTransactions = requireNonNull(closedTransactions); + this.purgedTransactions = requireNonNull(purgedTransactions); } final String persistenceId() { @@ -77,18 +80,21 @@ abstract class AbstractFrontendHistory implements Identifiable handleTransactionRequest(final TransactionRequest request, final RequestEnvelope envelope, final long now) throws RequestException { if (request instanceof TransactionPurgeRequest) { - return handleTransactionPurgeRequest(request, envelope, now); + return handleTransactionPurgeRequest((TransactionPurgeRequest) request, envelope, now); + } else if (request instanceof SkipTransactionsRequest) { + return handleSkipTransactionsRequest((SkipTransactionsRequest) request, envelope, now); } final TransactionIdentifier id = request.getTarget(); - final UnsignedLong ul = UnsignedLong.fromLongBits(id.getTransactionId()); - if (purgedTransactions.contains(ul)) { + final long txidBits = id.getTransactionId(); + if (purgedTransactions.contains(txidBits)) { LOG.warn("{}: Request {} is contained purged transactions {}", persistenceId, request, purgedTransactions); - throw new DeadTransactionException(purgedTransactions); + throw new DeadTransactionException(purgedTransactions.toRangeSet()); } - final Boolean closed = closedTransactions.get(ul); + + final Boolean closed = closedTransactions.get(UnsignedLong.fromLongBits(txidBits)); if (closed != null) { - final boolean successful = closed.booleanValue(); + final boolean successful = closed; LOG.debug("{}: Request {} refers to a {} transaction", persistenceId, request, successful ? "successful" : "failed"); throw new ClosedTransactionException(successful); @@ -116,11 +122,11 @@ abstract class AbstractFrontendHistory implements Identifiable handleTransactionPurgeRequest(final TransactionRequest request, + private TransactionPurgeResponse handleTransactionPurgeRequest(final TransactionPurgeRequest request, final RequestEnvelope envelope, final long now) { final TransactionIdentifier id = request.getTarget(); - final UnsignedLong ul = UnsignedLong.fromLongBits(id.getTransactionId()); - if (purgedTransactions.contains(ul)) { + final long txidBits = id.getTransactionId(); + if (purgedTransactions.contains(txidBits)) { // Retransmitted purge request: nothing to do LOG.debug("{}: transaction {} already purged", persistenceId, id); return new TransactionPurgeResponse(id, request.getSequence()); @@ -128,6 +134,7 @@ abstract class AbstractFrontendHistory implements Identifiable { closedTransactions.remove(ul); @@ -135,7 +142,7 @@ abstract class AbstractFrontendHistory implements Identifiable { - purgedTransactions.add(Range.closedOpen(ul, UnsignedLong.ONE.plus(ul))); + purgedTransactions.add(txidBits); transactions.remove(id); LOG.debug("{}: finished purging transaction {}", persistenceId(), id); envelope.sendSuccess(new TransactionPurgeResponse(id, request.getSequence()), readTime() - now); @@ -162,6 +169,43 @@ abstract class AbstractFrontendHistory implements Identifiable(others.size() + 1); + ids.add(UnsignedLong.fromLongBits(first.getTransactionId())); + ids.addAll(others); + + final var it = ids.iterator(); + while (it.hasNext()) { + final var id = it.next(); + final long bits = id.longValue(); + if (purgedTransactions.contains(bits)) { + LOG.warn("{}: history {} tracks {} as purged", persistenceId(), getIdentifier(), id); + it.remove(); + } else if (transactions.containsKey(new TransactionIdentifier(getIdentifier(), bits))) { + LOG.warn("{}: history {} tracks {} as open", persistenceId(), getIdentifier(), id); + it.remove(); + } + } + + if (ids.isEmpty()) { + LOG.debug("{}: history {} completing empty skip request", persistenceId(), getIdentifier()); + return new SkipTransactionsResponse(first, now); + } + + final var transactionIds = MutableUnsignedLongSet.of(ids.stream().mapToLong(UnsignedLong::longValue).toArray()) + .immutableCopy(); + LOG.debug("{}: history {} skipping transactions {}", persistenceId(), getIdentifier(), transactionIds.ranges()); + + tree.skipTransactions(getIdentifier(), transactionIds, () -> { + purgedTransactions.addAll(transactionIds); + envelope.sendSuccess(new TransactionPurgeResponse(first, request.getSequence()), readTime() - now); + }); + return null; + } + final void destroy(final long sequence, final RequestEnvelope envelope, final long now) { LOG.debug("{}: closing history {}", persistenceId(), getIdentifier()); tree.closeTransactionChain(getIdentifier(), @@ -201,8 +245,7 @@ abstract class AbstractFrontendHistory implements Identifiable> participatingShardNames); @Override - public String toString() { - return MoreObjects.toStringHelper(this).omitNullValues().add("identifier", getIdentifier()) - .add("persistenceId", persistenceId).add("transactions", transactions).toString(); + public final String toString() { + return MoreObjects.toStringHelper(this).omitNullValues() + .add("identifier", getIdentifier()) + .add("persistenceId", persistenceId) + .add("transactions", transactions) + .toString(); } }