X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FAbstractFrontendHistory.java;h=c1d2db309410dd831a31b3ec4845f02534749c9b;hb=refs%2Fchanges%2F36%2F60436%2F1;hp=dbea3a1cbb5cd00058c0bcf38dff6ce7f0b1e1b1;hpb=642f83ada35fca9ed7a8f242b90e41740803ddfa;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java index dbea3a1cbb..c1d2db3094 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java @@ -9,13 +9,20 @@ package org.opendaylight.controller.cluster.datastore; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; -import com.google.common.base.Ticker; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Range; +import com.google.common.collect.RangeSet; +import com.google.common.primitives.UnsignedLong; import java.util.HashMap; import java.util.Map; import java.util.Optional; import javax.annotation.Nullable; import org.opendaylight.controller.cluster.access.commands.AbstractReadTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.ClosedTransactionException; import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.DeadTransactionException; +import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest; +import org.opendaylight.controller.cluster.access.commands.LocalHistorySuccess; import org.opendaylight.controller.cluster.access.commands.OutOfOrderRequestException; import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest; import org.opendaylight.controller.cluster.access.commands.TransactionPurgeResponse; @@ -38,15 +45,24 @@ import org.slf4j.LoggerFactory; */ abstract class AbstractFrontendHistory implements Identifiable { private static final Logger LOG = LoggerFactory.getLogger(AbstractFrontendHistory.class); - private static final OutOfOrderRequestException UNSEQUENCED_START = new OutOfOrderRequestException(0); private final Map transactions = new HashMap<>(); + private final RangeSet purgedTransactions; private final String persistenceId; - private final Ticker ticker; + private final ShardDataTree tree; - AbstractFrontendHistory(final String persistenceId, final Ticker ticker) { + /** + * Transactions closed by the previous leader. Boolean indicates whether the transaction was committed (true) or + * aborted (false). We only ever shrink these. + */ + private Map closedTransactions; + + AbstractFrontendHistory(final String persistenceId, final ShardDataTree tree, + final Map closedTransactions, final RangeSet purgedTransactions) { this.persistenceId = Preconditions.checkNotNull(persistenceId); - this.ticker = Preconditions.checkNotNull(ticker); + this.tree = Preconditions.checkNotNull(tree); + this.closedTransactions = Preconditions.checkNotNull(closedTransactions); + this.purgedTransactions = Preconditions.checkNotNull(purgedTransactions); } final String persistenceId() { @@ -54,59 +70,124 @@ abstract class AbstractFrontendHistory implements Identifiable handleTransactionRequest(final TransactionRequest request, final RequestEnvelope envelope, final long now) throws RequestException { final TransactionIdentifier id = request.getTarget(); + final UnsignedLong ul = UnsignedLong.fromLongBits(id.getTransactionId()); - FrontendTransaction tx; if (request instanceof TransactionPurgeRequest) { - tx = transactions.remove(id); - if (tx == null) { - // We have no record of the transaction, nothing to do - LOG.debug("{}: no state for transaction {}, purge is complete", persistenceId(), id); + if (purgedTransactions.contains(ul)) { + // Retransmitted purge request: nothing to do + LOG.debug("{}: transaction {} already purged", persistenceId, id); return new TransactionPurgeResponse(id, request.getSequence()); } - } else { - tx = transactions.get(id); + + // We perform two lookups instead of a straight remove, because once the map becomes empty we switch it + // to an ImmutableMap, which does not allow remove(). + if (closedTransactions.containsKey(ul)) { + tree.purgeTransaction(id, () -> { + closedTransactions.remove(ul); + if (closedTransactions.isEmpty()) { + closedTransactions = ImmutableMap.of(); + } + + purgedTransactions.add(Range.closedOpen(ul, UnsignedLong.ONE.plus(ul))); + LOG.debug("{}: finished purging inherited transaction {}", persistenceId(), id); + envelope.sendSuccess(new TransactionPurgeResponse(id, request.getSequence()), readTime() - now); + }); + return null; + } + + final FrontendTransaction tx = transactions.get(id); if (tx == null) { - // The transaction does not exist and we are about to create it, check sequence number - if (request.getSequence() != 0) { - LOG.debug("{}: no transaction state present, unexpected request {}", persistenceId(), request); - throw UNSEQUENCED_START; - } - - tx = createTransaction(request, id); - transactions.put(id, tx); - } else { - final Optional> maybeReplay = tx.replaySequence(request.getSequence()); - if (maybeReplay.isPresent()) { - final TransactionSuccess replay = maybeReplay.get(); - LOG.debug("{}: envelope {} replaying response {}", persistenceId(), envelope, replay); - return replay; - } + // This should never happen because the purge callback removes the transaction and puts it into + // purged transactions in one go. If it does, we warn about the situation and + LOG.warn("{}: transaction {} not tracked in {}, but not present in active transactions", persistenceId, + id, purgedTransactions); + purgedTransactions.add(Range.closedOpen(ul, UnsignedLong.ONE.plus(ul))); + return new TransactionPurgeResponse(id, request.getSequence()); + } + + tree.purgeTransaction(id, () -> { + purgedTransactions.add(Range.closedOpen(ul, UnsignedLong.ONE.plus(ul))); + transactions.remove(id); + LOG.debug("{}: finished purging transaction {}", persistenceId(), id); + envelope.sendSuccess(new TransactionPurgeResponse(id, request.getSequence()), readTime() - now); + }); + return null; + } + + if (purgedTransactions.contains(ul)) { + LOG.warn("{}: Request {} is contained purged transactions {}", persistenceId, request, purgedTransactions); + throw new DeadTransactionException(purgedTransactions); + } + final Boolean closed = closedTransactions.get(ul); + if (closed != null) { + final boolean successful = closed.booleanValue(); + LOG.debug("{}: Request {} refers to a {} transaction", persistenceId, request, successful ? "successful" + : "failed"); + throw new ClosedTransactionException(successful); + } + + FrontendTransaction tx = transactions.get(id); + if (tx == null) { + // The transaction does not exist and we are about to create it, check sequence number + if (request.getSequence() != 0) { + LOG.warn("{}: no transaction state present, unexpected request {}", persistenceId(), request); + throw new OutOfOrderRequestException(0); + } + + tx = createTransaction(request, id); + transactions.put(id, tx); + } else if (!(request instanceof IncrementTransactionSequenceRequest)) { + final Optional> maybeReplay = tx.replaySequence(request.getSequence()); + if (maybeReplay.isPresent()) { + final TransactionSuccess replay = maybeReplay.get(); + LOG.debug("{}: envelope {} replaying response {}", persistenceId(), envelope, replay); + return replay; } } return tx.handleRequest(request, envelope, now); } + final void destroy(final long sequence, final RequestEnvelope envelope, final long now) { + LOG.debug("{}: closing history {}", persistenceId(), getIdentifier()); + tree.closeTransactionChain(getIdentifier(), + () -> envelope.sendSuccess(new LocalHistorySuccess(getIdentifier(), sequence), readTime() - now)); + } + + final void purge(final long sequence, final RequestEnvelope envelope, final long now) { + LOG.debug("{}: purging history {}", persistenceId(), getIdentifier()); + tree.purgeTransactionChain(getIdentifier(), + () -> envelope.sendSuccess(new LocalHistorySuccess(getIdentifier(), sequence), readTime() - now)); + } + + final void retire() { + transactions.values().forEach(FrontendTransaction::retire); + tree.removeTransactionChain(getIdentifier()); + } + private FrontendTransaction createTransaction(final TransactionRequest request, final TransactionIdentifier id) throws RequestException { if (request instanceof CommitLocalTransactionRequest) { LOG.debug("{}: allocating new ready transaction {}", persistenceId(), id); + tree.getStats().incrementReadWriteTransactionCount(); return createReadyTransaction(id, ((CommitLocalTransactionRequest) request).getModification()); } if (request instanceof AbstractReadTransactionRequest) { if (((AbstractReadTransactionRequest) request).isSnapshotOnly()) { - LOG.debug("{}: allocatint new open snapshot {}", persistenceId(), id); + LOG.debug("{}: allocating new open snapshot {}", persistenceId(), id); + tree.getStats().incrementReadOnlyTransactionCount(); return createOpenSnapshot(id); } } LOG.debug("{}: allocating new open transaction {}", persistenceId(), id); + tree.getStats().incrementReadWriteTransactionCount(); return createOpenTransaction(id); } @@ -117,6 +198,9 @@ abstract class AbstractFrontendHistory implements Identifiable