X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FLeaderFrontendState.java;h=5a5e42637e6a5a4908a23dd85df8abd88589967b;hb=refs%2Fchanges%2F36%2F60436%2F1;hp=7e5addaefd5b8f8180ad4f39300af7b2f9b1e549;hpb=88ba1506af44d1e9f1252f155c27a1309607477d;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java index 7e5addaefd..5a5e42637e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java @@ -9,11 +9,8 @@ package org.opendaylight.controller.cluster.datastore; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; -import com.google.common.collect.Range; -import com.google.common.collect.RangeSet; -import com.google.common.collect.TreeRangeSet; -import com.google.common.primitives.UnsignedLong; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; @@ -32,6 +29,8 @@ import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifie import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope; import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.UnsupportedRequestException; +import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort.State; +import org.opendaylight.controller.cluster.datastore.utils.UnsignedLongRangeSet; import org.opendaylight.yangtools.concepts.Identifiable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,7 +49,7 @@ final class LeaderFrontendState implements Identifiable { private final Map localHistories; // RangeSet performs automatic merging, hence we keep minimal state tracking information - private final RangeSet purgedHistories; + private final UnsignedLongRangeSet purgedHistories; // Used for all standalone transactions private final AbstractFrontendHistory standaloneHistory; @@ -71,12 +70,12 @@ final class LeaderFrontendState implements Identifiable { // - per-RequestException throw counters LeaderFrontendState(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree) { - this(persistenceId, clientId, tree, TreeRangeSet.create(), StandaloneFrontendHistory.create(persistenceId, - clientId, tree), new HashMap<>()); + this(persistenceId, clientId, tree, UnsignedLongRangeSet.create(), + StandaloneFrontendHistory.create(persistenceId, clientId, tree), new HashMap<>()); } LeaderFrontendState(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree, - final RangeSet purgedHistories, final AbstractFrontendHistory standaloneHistory, + final UnsignedLongRangeSet purgedHistories, final AbstractFrontendHistory standaloneHistory, final Map localHistories) { this.persistenceId = Preconditions.checkNotNull(persistenceId); this.clientId = Preconditions.checkNotNull(clientId); @@ -133,9 +132,9 @@ final class LeaderFrontendState implements Identifiable { // We have not found the history. Before we create it we need to check history ID sequencing so that we do not // end up resurrecting a purged history. - if (purgedHistories.contains(UnsignedLong.fromLongBits(historyId.getHistoryId()))) { + if (purgedHistories.contains(historyId.getHistoryId())) { LOG.debug("{}: rejecting purged request {}", persistenceId, request); - throw new DeadHistoryException(purgedHistories); + throw new DeadHistoryException(purgedHistories.toImmutable()); } // Update last history we have seen @@ -146,7 +145,7 @@ final class LeaderFrontendState implements Identifiable { // We have to send the response only after persistence has completed final ShardDataTreeTransactionChain chain = tree.ensureTransactionChain(historyId, () -> { LOG.debug("{}: persisted history {}", persistenceId, historyId); - envelope.sendSuccess(new LocalHistorySuccess(historyId, request.getSequence()), tree.ticker().read() - now); + envelope.sendSuccess(new LocalHistorySuccess(historyId, request.getSequence()), tree.readTime() - now); }); localHistories.put(historyId, LocalFrontendHistory.create(persistenceId, tree, chain)); @@ -179,7 +178,7 @@ final class LeaderFrontendState implements Identifiable { } LOG.debug("{}: purging history {}", persistenceId, id); - purgedHistories.add(Range.singleton(UnsignedLong.fromLongBits(id.getHistoryId()))); + purgedHistories.add(id.getHistoryId()); existing.purge(request.getSequence(), envelope, now); return null; } @@ -195,9 +194,9 @@ final class LeaderFrontendState implements Identifiable { if (lhId.getHistoryId() != 0) { history = localHistories.get(lhId); if (history == null) { - if (purgedHistories.contains(UnsignedLong.fromLongBits(lhId.getHistoryId()))) { + if (purgedHistories.contains(lhId.getHistoryId())) { LOG.warn("{}: rejecting request {} to purged history", persistenceId, request); - throw new DeadHistoryException(purgedHistories); + throw new DeadHistoryException(purgedHistories.toImmutable()); } LOG.warn("{}: rejecting unknown history request {}", persistenceId, request); @@ -218,7 +217,25 @@ final class LeaderFrontendState implements Identifiable { } void retire() { - // FIXME: flush all state + // Hunt down any transactions associated with this frontend + final Iterator it = tree.cohortIterator(); + while (it.hasNext()) { + final SimpleShardDataTreeCohort cohort = it.next(); + if (clientId.equals(cohort.getIdentifier().getHistoryId().getClientId())) { + if (cohort.getState() != State.COMMIT_PENDING) { + LOG.debug("{}: Retiring transaction {}", persistenceId, cohort.getIdentifier()); + it.remove(); + } else { + LOG.debug("{}: Transaction {} already committing, not retiring it", persistenceId, + cohort.getIdentifier()); + } + } + } + + // Clear out all transaction chains + localHistories.values().forEach(AbstractFrontendHistory::retire); + localHistories.clear(); + standaloneHistory.retire(); } @Override