X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FAbstractFrontendHistory.java;h=01fc35c395656896ba59e668134429875cc13508;hb=546cd1fd100dbaa36908b22c2f422320dbd8c4b2;hp=f23a8ffe98abcec519fd9ed73f006c22a2417553;hpb=2634ed7138a343f051ff6452ccc7edd3abfc0c3a;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java index f23a8ffe98..01fc35c395 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java @@ -7,8 +7,9 @@ */ package org.opendaylight.controller.cluster.datastore; +import static java.util.Objects.requireNonNull; + import com.google.common.base.MoreObjects; -import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Range; import com.google.common.collect.RangeSet; @@ -16,11 +17,13 @@ import com.google.common.primitives.UnsignedLong; import java.util.HashMap; import java.util.Map; import java.util.Optional; -import javax.annotation.Nullable; +import java.util.SortedSet; +import org.eclipse.jdt.annotation.Nullable; import org.opendaylight.controller.cluster.access.commands.AbstractReadTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ClosedTransactionException; import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.DeadTransactionException; +import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest; import org.opendaylight.controller.cluster.access.commands.LocalHistorySuccess; import org.opendaylight.controller.cluster.access.commands.OutOfOrderRequestException; import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest; @@ -44,7 +47,6 @@ import org.slf4j.LoggerFactory; */ abstract class AbstractFrontendHistory implements Identifiable { private static final Logger LOG = LoggerFactory.getLogger(AbstractFrontendHistory.class); - private static final OutOfOrderRequestException UNSEQUENCED_START = new OutOfOrderRequestException(0); private final Map transactions = new HashMap<>(); private final RangeSet purgedTransactions; @@ -59,10 +61,10 @@ abstract class AbstractFrontendHistory implements Identifiable closedTransactions, final RangeSet purgedTransactions) { - this.persistenceId = Preconditions.checkNotNull(persistenceId); - this.tree = Preconditions.checkNotNull(tree); - this.closedTransactions = Preconditions.checkNotNull(closedTransactions); - this.purgedTransactions = Preconditions.checkNotNull(purgedTransactions); + this.persistenceId = requireNonNull(persistenceId); + this.tree = requireNonNull(tree); + this.closedTransactions = requireNonNull(closedTransactions); + this.purgedTransactions = requireNonNull(purgedTransactions); } final String persistenceId() { @@ -70,56 +72,17 @@ abstract class AbstractFrontendHistory implements Identifiable handleTransactionRequest(final TransactionRequest request, final RequestEnvelope envelope, final long now) throws RequestException { - final TransactionIdentifier id = request.getTarget(); - final UnsignedLong ul = UnsignedLong.fromLongBits(id.getTransactionId()); - if (request instanceof TransactionPurgeRequest) { - if (purgedTransactions.contains(ul)) { - // Retransmitted purge request: nothing to do - LOG.debug("{}: transaction {} already purged", persistenceId, id); - return new TransactionPurgeResponse(id, request.getSequence()); - } - - // We perform two lookups instead of a straight remove, because once the map becomes empty we switch it - // to an ImmutableMap, which does not allow remove(). - if (closedTransactions.containsKey(ul)) { - tree.purgeTransaction(id, () -> { - closedTransactions.remove(ul); - if (closedTransactions.isEmpty()) { - closedTransactions = ImmutableMap.of(); - } - - purgedTransactions.add(Range.singleton(ul)); - LOG.debug("{}: finished purging inherited transaction {}", persistenceId(), id); - envelope.sendSuccess(new TransactionPurgeResponse(id, request.getSequence()), readTime() - now); - }); - return null; - } - - final FrontendTransaction tx = transactions.get(id); - if (tx == null) { - // This should never happen because the purge callback removes the transaction and puts it into - // purged transactions in one go. If it does, we warn about the situation and - LOG.warn("{}: transaction {} not tracked in {}, but not present in active transactions", persistenceId, - id, purgedTransactions); - purgedTransactions.add(Range.singleton(ul)); - return new TransactionPurgeResponse(id, request.getSequence()); - } - - tx.purge(() -> { - purgedTransactions.add(Range.singleton(ul)); - transactions.remove(id); - LOG.debug("{}: finished purging transaction {}", persistenceId(), id); - envelope.sendSuccess(new TransactionPurgeResponse(id, request.getSequence()), readTime() - now); - }); - return null; + return handleTransactionPurgeRequest(request, envelope, now); } + final TransactionIdentifier id = request.getTarget(); + final UnsignedLong ul = UnsignedLong.fromLongBits(id.getTransactionId()); if (purgedTransactions.contains(ul)) { LOG.warn("{}: Request {} is contained purged transactions {}", persistenceId, request, purgedTransactions); throw new DeadTransactionException(purgedTransactions); @@ -136,13 +99,13 @@ abstract class AbstractFrontendHistory implements Identifiable> maybeReplay = tx.replaySequence(request.getSequence()); if (maybeReplay.isPresent()) { final TransactionSuccess replay = maybeReplay.get(); @@ -154,45 +117,99 @@ abstract class AbstractFrontendHistory implements Identifiable { - envelope.sendSuccess(new LocalHistorySuccess(getIdentifier(), sequence), readTime() - now); + private TransactionSuccess handleTransactionPurgeRequest(final TransactionRequest request, + final RequestEnvelope envelope, final long now) { + final TransactionIdentifier id = request.getTarget(); + final UnsignedLong ul = UnsignedLong.fromLongBits(id.getTransactionId()); + if (purgedTransactions.contains(ul)) { + // Retransmitted purge request: nothing to do + LOG.debug("{}: transaction {} already purged", persistenceId, id); + return new TransactionPurgeResponse(id, request.getSequence()); + } + + // We perform two lookups instead of a straight remove, because once the map becomes empty we switch it + // to an ImmutableMap, which does not allow remove(). + if (closedTransactions.containsKey(ul)) { + tree.purgeTransaction(id, () -> { + closedTransactions.remove(ul); + if (closedTransactions.isEmpty()) { + closedTransactions = ImmutableMap.of(); + } + + purgedTransactions.add(Range.closedOpen(ul, UnsignedLong.ONE.plus(ul))); + LOG.debug("{}: finished purging inherited transaction {}", persistenceId(), id); + envelope.sendSuccess(new TransactionPurgeResponse(id, request.getSequence()), readTime() - now); + }); + return null; + } + + final FrontendTransaction tx = transactions.get(id); + if (tx == null) { + // This should never happen because the purge callback removes the transaction and puts it into + // purged transactions in one go. If it does, we warn about the situation and + LOG.warn("{}: transaction {} not tracked in {}, but not present in active transactions", persistenceId, + id, purgedTransactions); + purgedTransactions.add(Range.closedOpen(ul, UnsignedLong.ONE.plus(ul))); + return new TransactionPurgeResponse(id, request.getSequence()); + } + + tree.purgeTransaction(id, () -> { + purgedTransactions.add(Range.closedOpen(ul, UnsignedLong.ONE.plus(ul))); + transactions.remove(id); + LOG.debug("{}: finished purging transaction {}", persistenceId(), id); + envelope.sendSuccess(new TransactionPurgeResponse(id, request.getSequence()), readTime() - now); }); + + return null; } - void purge(final long sequence, final RequestEnvelope envelope, final long now) { + final void destroy(final long sequence, final RequestEnvelope envelope, final long now) { + LOG.debug("{}: closing history {}", persistenceId(), getIdentifier()); + tree.closeTransactionChain(getIdentifier(), + () -> envelope.sendSuccess(new LocalHistorySuccess(getIdentifier(), sequence), readTime() - now)); + } + + final void purge(final long sequence, final RequestEnvelope envelope, final long now) { LOG.debug("{}: purging history {}", persistenceId(), getIdentifier()); - tree.purgeTransactionChain(getIdentifier(), () -> { - envelope.sendSuccess(new LocalHistorySuccess(getIdentifier(), sequence), readTime() - now); - }); + tree.purgeTransactionChain(getIdentifier(), + () -> envelope.sendSuccess(new LocalHistorySuccess(getIdentifier(), sequence), readTime() - now)); } - private FrontendTransaction createTransaction(final TransactionRequest request, final TransactionIdentifier id) - throws RequestException { + final void retire() { + transactions.values().forEach(FrontendTransaction::retire); + tree.removeTransactionChain(getIdentifier()); + } + + private FrontendTransaction createTransaction(final TransactionRequest request, final TransactionIdentifier id) { if (request instanceof CommitLocalTransactionRequest) { LOG.debug("{}: allocating new ready transaction {}", persistenceId(), id); + tree.getStats().incrementReadWriteTransactionCount(); return createReadyTransaction(id, ((CommitLocalTransactionRequest) request).getModification()); } - if (request instanceof AbstractReadTransactionRequest) { - if (((AbstractReadTransactionRequest) request).isSnapshotOnly()) { - LOG.debug("{}: allocatint new open snapshot {}", persistenceId(), id); - return createOpenSnapshot(id); - } + if (request instanceof AbstractReadTransactionRequest + && ((AbstractReadTransactionRequest) request).isSnapshotOnly()) { + LOG.debug("{}: allocating new open snapshot {}", persistenceId(), id); + tree.getStats().incrementReadOnlyTransactionCount(); + return createOpenSnapshot(id); } LOG.debug("{}: allocating new open transaction {}", persistenceId(), id); + tree.getStats().incrementReadWriteTransactionCount(); return createOpenTransaction(id); } - abstract FrontendTransaction createOpenSnapshot(TransactionIdentifier id) throws RequestException; + abstract FrontendTransaction createOpenSnapshot(TransactionIdentifier id); - abstract FrontendTransaction createOpenTransaction(TransactionIdentifier id) throws RequestException; + abstract FrontendTransaction createOpenTransaction(TransactionIdentifier id); abstract FrontendTransaction createReadyTransaction(TransactionIdentifier id, DataTreeModification mod) - throws RequestException; + ; + + abstract ShardDataTreeCohort createFailedCohort(TransactionIdentifier id, DataTreeModification mod, + Exception failure); - abstract ShardDataTreeCohort createReadyCohort(TransactionIdentifier id, DataTreeModification mod); + abstract ShardDataTreeCohort createReadyCohort(TransactionIdentifier id, DataTreeModification mod, + Optional> participatingShardNames); @Override public String toString() {