X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FAbstractFrontendHistory.java;h=31bf000ac2123a0bcc0c573fbedc84fba7247bc3;hb=2dedb8231e13abe55d6b75eb532d23dbe536e168;hp=e437b07c64bc7b7d9fdff19b5641bdce550dd49a;hpb=19a6bcd20358c883478ee3b82e67cb147113f1c0;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java index e437b07c64..31bf000ac2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java @@ -12,6 +12,7 @@ import static java.util.Objects.requireNonNull; import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableMap; import com.google.common.primitives.UnsignedLong; +import java.util.ArrayList; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -24,6 +25,8 @@ import org.opendaylight.controller.cluster.access.commands.DeadTransactionExcept import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest; import org.opendaylight.controller.cluster.access.commands.LocalHistorySuccess; import org.opendaylight.controller.cluster.access.commands.OutOfOrderRequestException; +import org.opendaylight.controller.cluster.access.commands.SkipTransactionsRequest; +import org.opendaylight.controller.cluster.access.commands.SkipTransactionsResponse; import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest; import org.opendaylight.controller.cluster.access.commands.TransactionPurgeResponse; import org.opendaylight.controller.cluster.access.commands.TransactionRequest; @@ -32,9 +35,9 @@ import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifie import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope; import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; -import org.opendaylight.controller.cluster.datastore.utils.UnsignedLongSet; +import org.opendaylight.controller.cluster.datastore.utils.MutableUnsignedLongSet; import org.opendaylight.yangtools.concepts.Identifiable; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; +import org.opendaylight.yangtools.yang.data.tree.api.DataTreeModification; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,7 +51,7 @@ abstract class AbstractFrontendHistory implements Identifiable transactions = new HashMap<>(); - private final UnsignedLongSet purgedTransactions; + private final MutableUnsignedLongSet purgedTransactions; private final String persistenceId; private final ShardDataTree tree; @@ -59,7 +62,7 @@ abstract class AbstractFrontendHistory implements Identifiable closedTransactions; AbstractFrontendHistory(final String persistenceId, final ShardDataTree tree, - final Map closedTransactions, final UnsignedLongSet purgedTransactions) { + final Map closedTransactions, final MutableUnsignedLongSet purgedTransactions) { this.persistenceId = requireNonNull(persistenceId); this.tree = requireNonNull(tree); this.closedTransactions = requireNonNull(closedTransactions); @@ -77,7 +80,9 @@ abstract class AbstractFrontendHistory implements Identifiable handleTransactionRequest(final TransactionRequest request, final RequestEnvelope envelope, final long now) throws RequestException { if (request instanceof TransactionPurgeRequest) { - return handleTransactionPurgeRequest(request, envelope, now); + return handleTransactionPurgeRequest((TransactionPurgeRequest) request, envelope, now); + } else if (request instanceof SkipTransactionsRequest) { + return handleSkipTransactionsRequest((SkipTransactionsRequest) request, envelope, now); } final TransactionIdentifier id = request.getTarget(); @@ -117,7 +122,7 @@ abstract class AbstractFrontendHistory implements Identifiable handleTransactionPurgeRequest(final TransactionRequest request, + private TransactionPurgeResponse handleTransactionPurgeRequest(final TransactionPurgeRequest request, final RequestEnvelope envelope, final long now) { final TransactionIdentifier id = request.getTarget(); final long txidBits = id.getTransactionId(); @@ -164,6 +169,43 @@ abstract class AbstractFrontendHistory implements Identifiable(others.size() + 1); + ids.add(UnsignedLong.fromLongBits(first.getTransactionId())); + ids.addAll(others); + + final var it = ids.iterator(); + while (it.hasNext()) { + final var id = it.next(); + final long bits = id.longValue(); + if (purgedTransactions.contains(bits)) { + LOG.warn("{}: history {} tracks {} as purged", persistenceId(), getIdentifier(), id); + it.remove(); + } else if (transactions.containsKey(new TransactionIdentifier(getIdentifier(), bits))) { + LOG.warn("{}: history {} tracks {} as open", persistenceId(), getIdentifier(), id); + it.remove(); + } + } + + if (ids.isEmpty()) { + LOG.debug("{}: history {} completing empty skip request", persistenceId(), getIdentifier()); + return new SkipTransactionsResponse(first, now); + } + + final var transactionIds = MutableUnsignedLongSet.of(ids.stream().mapToLong(UnsignedLong::longValue).toArray()) + .immutableCopy(); + LOG.debug("{}: history {} skipping transactions {}", persistenceId(), getIdentifier(), transactionIds.ranges()); + + tree.skipTransactions(getIdentifier(), transactionIds, () -> { + purgedTransactions.addAll(transactionIds); + envelope.sendSuccess(new TransactionPurgeResponse(first, request.getSequence()), readTime() - now); + }); + return null; + } + final void destroy(final long sequence, final RequestEnvelope envelope, final long now) { LOG.debug("{}: closing history {}", persistenceId(), getIdentifier()); tree.closeTransactionChain(getIdentifier(), @@ -203,8 +245,7 @@ abstract class AbstractFrontendHistory implements Identifiable> participatingShardNames); @Override - public String toString() { - return MoreObjects.toStringHelper(this).omitNullValues().add("identifier", getIdentifier()) - .add("persistenceId", persistenceId).add("transactions", transactions).toString(); + public final String toString() { + return MoreObjects.toStringHelper(this).omitNullValues() + .add("identifier", getIdentifier()) + .add("persistenceId", persistenceId) + .add("transactions", transactions) + .toString(); } }