X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FAbstractFrontendHistory.java;h=022bb7aa07e2016670d9fc4fd36e006116655322;hp=22536cc50af0199cda53a09b72b59f83da609eff;hb=refs%2Fchanges%2F49%2F85749%2F63;hpb=e131c3498d286ff14890120ff5e9020ba89f10f9 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java index 22536cc50a..022bb7aa07 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java @@ -12,6 +12,7 @@ import static java.util.Objects.requireNonNull; import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableMap; import com.google.common.primitives.UnsignedLong; +import java.util.ArrayList; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -24,6 +25,8 @@ import org.opendaylight.controller.cluster.access.commands.DeadTransactionExcept import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest; import org.opendaylight.controller.cluster.access.commands.LocalHistorySuccess; import org.opendaylight.controller.cluster.access.commands.OutOfOrderRequestException; +import org.opendaylight.controller.cluster.access.commands.SkipTransactionsRequest; +import org.opendaylight.controller.cluster.access.commands.SkipTransactionsResponse; import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest; import org.opendaylight.controller.cluster.access.commands.TransactionPurgeResponse; import org.opendaylight.controller.cluster.access.commands.TransactionRequest; @@ -78,6 +81,8 @@ abstract class AbstractFrontendHistory implements Identifiable(others.size() + 1); + ids.add(UnsignedLong.fromLongBits(first.getTransactionId())); + ids.addAll(others); + + final var it = ids.iterator(); + while (it.hasNext()) { + final var id = it.next(); + final long bits = id.longValue(); + if (purgedTransactions.contains(bits)) { + LOG.warn("{}: history {} tracks {} as purged", persistenceId(), getIdentifier(), id); + it.remove(); + } else if (transactions.containsKey(new TransactionIdentifier(getIdentifier(), bits))) { + LOG.warn("{}: history {} tracks {} as open", persistenceId(), getIdentifier(), id); + it.remove(); + } + } + + if (ids.isEmpty()) { + LOG.debug("{}: history {} completing empty skip request", persistenceId(), getIdentifier()); + return new SkipTransactionsResponse(first, now); + } + + final var transactionIds = MutableUnsignedLongSet.of(ids.stream().mapToLong(UnsignedLong::longValue).toArray()) + .immutableCopy(); + LOG.debug("{}: history {} skipping transactions {}", persistenceId(), getIdentifier(), transactionIds.ranges()); + + tree.skipTransactions(getIdentifier(), transactionIds, () -> { + purgedTransactions.addAll(transactionIds); + envelope.sendSuccess(new TransactionPurgeResponse(first, request.getSequence()), readTime() - now); + }); + return null; + } + final void destroy(final long sequence, final RequestEnvelope envelope, final long now) { LOG.debug("{}: closing history {}", persistenceId(), getIdentifier()); tree.closeTransactionChain(getIdentifier(),