X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FAbstractFrontendHistory.java;h=22536cc50af0199cda53a09b72b59f83da609eff;hp=b7bd46c738d8a6fa5f55ecb09933048592b27c30;hb=3402cfce32b05957219e54754dd7ca5b0a54cd0e;hpb=43130cfeb2a1ac9f733ac8a777cabb36ff1277af diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java index b7bd46c738..22536cc50a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java @@ -7,20 +7,21 @@ */ package org.opendaylight.controller.cluster.datastore; +import static java.util.Objects.requireNonNull; + import com.google.common.base.MoreObjects; -import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Range; -import com.google.common.collect.RangeSet; import com.google.common.primitives.UnsignedLong; import java.util.HashMap; import java.util.Map; import java.util.Optional; -import javax.annotation.Nullable; +import java.util.SortedSet; +import org.eclipse.jdt.annotation.Nullable; import org.opendaylight.controller.cluster.access.commands.AbstractReadTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ClosedTransactionException; import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.DeadTransactionException; +import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest; import org.opendaylight.controller.cluster.access.commands.LocalHistorySuccess; import org.opendaylight.controller.cluster.access.commands.OutOfOrderRequestException; import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest; @@ -31,6 +32,7 @@ import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifie import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope; import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.utils.MutableUnsignedLongSet; import org.opendaylight.yangtools.concepts.Identifiable; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; import org.slf4j.Logger; @@ -44,10 +46,9 @@ import org.slf4j.LoggerFactory; */ abstract class AbstractFrontendHistory implements Identifiable { private static final Logger LOG = LoggerFactory.getLogger(AbstractFrontendHistory.class); - private static final OutOfOrderRequestException UNSEQUENCED_START = new OutOfOrderRequestException(0); private final Map transactions = new HashMap<>(); - private final RangeSet purgedTransactions; + private final MutableUnsignedLongSet purgedTransactions; private final String persistenceId; private final ShardDataTree tree; @@ -58,11 +59,11 @@ abstract class AbstractFrontendHistory implements Identifiable closedTransactions; AbstractFrontendHistory(final String persistenceId, final ShardDataTree tree, - final Map closedTransactions, final RangeSet purgedTransactions) { - this.persistenceId = Preconditions.checkNotNull(persistenceId); - this.tree = Preconditions.checkNotNull(tree); - this.closedTransactions = Preconditions.checkNotNull(closedTransactions); - this.purgedTransactions = Preconditions.checkNotNull(purgedTransactions); + final Map closedTransactions, final MutableUnsignedLongSet purgedTransactions) { + this.persistenceId = requireNonNull(persistenceId); + this.tree = requireNonNull(tree); + this.closedTransactions = requireNonNull(closedTransactions); + this.purgedTransactions = requireNonNull(purgedTransactions); } final String persistenceId() { @@ -70,63 +71,25 @@ abstract class AbstractFrontendHistory implements Identifiable handleTransactionRequest(final TransactionRequest request, final RequestEnvelope envelope, final long now) throws RequestException { - final TransactionIdentifier id = request.getTarget(); - final UnsignedLong ul = UnsignedLong.fromLongBits(id.getTransactionId()); - if (request instanceof TransactionPurgeRequest) { - if (purgedTransactions.contains(ul)) { - // Retransmitted purge request: nothing to do - LOG.debug("{}: transaction {} already purged", persistenceId, id); - return new TransactionPurgeResponse(id, request.getSequence()); - } - - // We perform two lookups instead of a straight remove, because once the map becomes empty we switch it - // to an ImmutableMap, which does not allow remove(). - if (closedTransactions.containsKey(ul)) { - tree.purgeTransaction(id, () -> { - closedTransactions.remove(ul); - if (closedTransactions.isEmpty()) { - closedTransactions = ImmutableMap.of(); - } - - purgedTransactions.add(Range.singleton(ul)); - LOG.debug("{}: finished purging inherited transaction {}", persistenceId(), id); - envelope.sendSuccess(new TransactionPurgeResponse(id, request.getSequence()), readTime() - now); - }); - return null; - } - - final FrontendTransaction tx = transactions.get(id); - if (tx == null) { - // This should never happen because the purge callback removes the transaction and puts it into - // purged transactions in one go. If it does, we warn about the situation and - LOG.warn("{}: transaction {} not tracked in {}, but not present in active transactions", persistenceId, - id, purgedTransactions); - purgedTransactions.add(Range.singleton(ul)); - return new TransactionPurgeResponse(id, request.getSequence()); - } - - tree.purgeTransaction(id, () -> { - purgedTransactions.add(Range.singleton(ul)); - transactions.remove(id); - LOG.debug("{}: finished purging transaction {}", persistenceId(), id); - envelope.sendSuccess(new TransactionPurgeResponse(id, request.getSequence()), readTime() - now); - }); - return null; + return handleTransactionPurgeRequest((TransactionPurgeRequest) request, envelope, now); } - if (purgedTransactions.contains(ul)) { + final TransactionIdentifier id = request.getTarget(); + final long txidBits = id.getTransactionId(); + if (purgedTransactions.contains(txidBits)) { LOG.warn("{}: Request {} is contained purged transactions {}", persistenceId, request, purgedTransactions); - throw new DeadTransactionException(purgedTransactions); + throw new DeadTransactionException(purgedTransactions.toRangeSet()); } - final Boolean closed = closedTransactions.get(ul); + + final Boolean closed = closedTransactions.get(UnsignedLong.fromLongBits(txidBits)); if (closed != null) { - final boolean successful = closed.booleanValue(); + final boolean successful = closed; LOG.debug("{}: Request {} refers to a {} transaction", persistenceId, request, successful ? "successful" : "failed"); throw new ClosedTransactionException(successful); @@ -136,13 +99,13 @@ abstract class AbstractFrontendHistory implements Identifiable> maybeReplay = tx.replaySequence(request.getSequence()); if (maybeReplay.isPresent()) { final TransactionSuccess replay = maybeReplay.get(); @@ -154,31 +117,81 @@ abstract class AbstractFrontendHistory implements Identifiable { + closedTransactions.remove(ul); + if (closedTransactions.isEmpty()) { + closedTransactions = ImmutableMap.of(); + } + + purgedTransactions.add(txidBits); + LOG.debug("{}: finished purging inherited transaction {}", persistenceId(), id); + envelope.sendSuccess(new TransactionPurgeResponse(id, request.getSequence()), readTime() - now); + }); + return null; + } + + final FrontendTransaction tx = transactions.get(id); + if (tx == null) { + // This should never happen because the purge callback removes the transaction and puts it into + // purged transactions in one go. If it does, we warn about the situation and + LOG.warn("{}: transaction {} not tracked in {}, but not present in active transactions", persistenceId, + id, purgedTransactions); + purgedTransactions.add(txidBits); + return new TransactionPurgeResponse(id, request.getSequence()); + } + + tree.purgeTransaction(id, () -> { + purgedTransactions.add(txidBits); + transactions.remove(id); + LOG.debug("{}: finished purging transaction {}", persistenceId(), id); + envelope.sendSuccess(new TransactionPurgeResponse(id, request.getSequence()), readTime() - now); + }); + + return null; + } + + final void destroy(final long sequence, final RequestEnvelope envelope, final long now) { LOG.debug("{}: closing history {}", persistenceId(), getIdentifier()); tree.closeTransactionChain(getIdentifier(), () -> envelope.sendSuccess(new LocalHistorySuccess(getIdentifier(), sequence), readTime() - now)); } - void purge(final long sequence, final RequestEnvelope envelope, final long now) { + final void purge(final long sequence, final RequestEnvelope envelope, final long now) { LOG.debug("{}: purging history {}", persistenceId(), getIdentifier()); tree.purgeTransactionChain(getIdentifier(), () -> envelope.sendSuccess(new LocalHistorySuccess(getIdentifier(), sequence), readTime() - now)); } - private FrontendTransaction createTransaction(final TransactionRequest request, final TransactionIdentifier id) - throws RequestException { + final void retire() { + transactions.values().forEach(FrontendTransaction::retire); + tree.removeTransactionChain(getIdentifier()); + } + + private FrontendTransaction createTransaction(final TransactionRequest request, final TransactionIdentifier id) { if (request instanceof CommitLocalTransactionRequest) { LOG.debug("{}: allocating new ready transaction {}", persistenceId(), id); tree.getStats().incrementReadWriteTransactionCount(); return createReadyTransaction(id, ((CommitLocalTransactionRequest) request).getModification()); } - if (request instanceof AbstractReadTransactionRequest) { - if (((AbstractReadTransactionRequest) request).isSnapshotOnly()) { - LOG.debug("{}: allocatint new open snapshot {}", persistenceId(), id); - tree.getStats().incrementReadOnlyTransactionCount(); - return createOpenSnapshot(id); - } + if (request instanceof AbstractReadTransactionRequest + && ((AbstractReadTransactionRequest) request).isSnapshotOnly()) { + LOG.debug("{}: allocating new open snapshot {}", persistenceId(), id); + tree.getStats().incrementReadOnlyTransactionCount(); + return createOpenSnapshot(id); } LOG.debug("{}: allocating new open transaction {}", persistenceId(), id); @@ -186,21 +199,24 @@ abstract class AbstractFrontendHistory implements Identifiable> participatingShardNames); @Override - public String toString() { - return MoreObjects.toStringHelper(this).omitNullValues().add("identifier", getIdentifier()) - .add("persistenceId", persistenceId).add("transactions", transactions).toString(); + public final String toString() { + return MoreObjects.toStringHelper(this).omitNullValues() + .add("identifier", getIdentifier()) + .add("persistenceId", persistenceId) + .add("transactions", transactions) + .toString(); } }