X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FLeaderFrontendState.java;h=7611b029ca53a4562753d7346f86db26daef02db;hb=5dbaf1259ead1904536db204bbc742a3359c1eb1;hp=409268501a4f5e6914b0ca61f7aadcebe8c35037;hpb=2634ed7138a343f051ff6452ccc7edd3abfc0c3a;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java index 409268501a..7611b029ca 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java @@ -9,11 +9,8 @@ package org.opendaylight.controller.cluster.datastore; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; -import com.google.common.collect.Range; -import com.google.common.collect.RangeSet; -import com.google.common.collect.TreeRangeSet; -import com.google.common.primitives.UnsignedLong; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; @@ -22,7 +19,7 @@ import org.opendaylight.controller.cluster.access.commands.DeadHistoryException; import org.opendaylight.controller.cluster.access.commands.DestroyLocalHistoryRequest; import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest; import org.opendaylight.controller.cluster.access.commands.LocalHistorySuccess; -import org.opendaylight.controller.cluster.access.commands.OutOfOrderRequestException; +import org.opendaylight.controller.cluster.access.commands.OutOfSequenceEnvelopeException; import org.opendaylight.controller.cluster.access.commands.PurgeLocalHistoryRequest; import org.opendaylight.controller.cluster.access.commands.TransactionRequest; import org.opendaylight.controller.cluster.access.commands.TransactionSuccess; @@ -32,6 +29,8 @@ import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifie import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope; import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.UnsupportedRequestException; +import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort.State; +import org.opendaylight.controller.cluster.datastore.utils.UnsignedLongRangeSet; import org.opendaylight.yangtools.concepts.Identifiable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,7 +49,7 @@ final class LeaderFrontendState implements Identifiable { private final Map localHistories; // RangeSet performs automatic merging, hence we keep minimal state tracking information - private final RangeSet purgedHistories; + private final UnsignedLongRangeSet purgedHistories; // Used for all standalone transactions private final AbstractFrontendHistory standaloneHistory; @@ -58,6 +57,8 @@ final class LeaderFrontendState implements Identifiable { private final ClientIdentifier clientId; private final String persistenceId; + private long lastConnectTicks; + private long lastSeenTicks; private long expectedTxSequence; private Long lastSeenHistory = null; @@ -71,12 +72,12 @@ final class LeaderFrontendState implements Identifiable { // - per-RequestException throw counters LeaderFrontendState(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree) { - this(persistenceId, clientId, tree, TreeRangeSet.create(), StandaloneFrontendHistory.create(persistenceId, - clientId, tree), new HashMap<>()); + this(persistenceId, clientId, tree, UnsignedLongRangeSet.create(), + StandaloneFrontendHistory.create(persistenceId, clientId, tree), new HashMap<>()); } LeaderFrontendState(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree, - final RangeSet purgedHistories, final AbstractFrontendHistory standaloneHistory, + final UnsignedLongRangeSet purgedHistories, final AbstractFrontendHistory standaloneHistory, final Map localHistories) { this.persistenceId = Preconditions.checkNotNull(persistenceId); this.clientId = Preconditions.checkNotNull(clientId); @@ -84,6 +85,7 @@ final class LeaderFrontendState implements Identifiable { this.purgedHistories = Preconditions.checkNotNull(purgedHistories); this.standaloneHistory = Preconditions.checkNotNull(standaloneHistory); this.localHistories = Preconditions.checkNotNull(localHistories); + this.lastSeenTicks = tree.readTime(); } @Override @@ -91,9 +93,9 @@ final class LeaderFrontendState implements Identifiable { return clientId; } - private void checkRequestSequence(final RequestEnvelope envelope) throws OutOfOrderRequestException { + private void checkRequestSequence(final RequestEnvelope envelope) throws OutOfSequenceEnvelopeException { if (expectedTxSequence != envelope.getTxSequence()) { - throw new OutOfOrderRequestException(expectedTxSequence); + throw new OutOfSequenceEnvelopeException(expectedTxSequence); } } @@ -107,12 +109,13 @@ final class LeaderFrontendState implements Identifiable { try { if (request instanceof CreateLocalHistoryRequest) { - return handleCreateHistory((CreateLocalHistoryRequest) request); + return handleCreateHistory((CreateLocalHistoryRequest) request, envelope, now); } else if (request instanceof DestroyLocalHistoryRequest) { return handleDestroyHistory((DestroyLocalHistoryRequest) request, envelope, now); } else if (request instanceof PurgeLocalHistoryRequest) { return handlePurgeHistory((PurgeLocalHistoryRequest)request, envelope, now); } else { + LOG.warn("{}: rejecting unsupported request {}", persistenceId, request); throw new UnsupportedRequestException(request); } } finally { @@ -120,35 +123,41 @@ final class LeaderFrontendState implements Identifiable { } } - private LocalHistorySuccess handleCreateHistory(final CreateLocalHistoryRequest request) throws RequestException { - final LocalHistoryIdentifier id = request.getTarget(); - final AbstractFrontendHistory existing = localHistories.get(id); + private LocalHistorySuccess handleCreateHistory(final CreateLocalHistoryRequest request, + final RequestEnvelope envelope, final long now) throws RequestException { + final LocalHistoryIdentifier historyId = request.getTarget(); + final AbstractFrontendHistory existing = localHistories.get(historyId); if (existing != null) { // History already exists: report success - LOG.debug("{}: history {} already exists", persistenceId, id); - return new LocalHistorySuccess(id, request.getSequence()); + LOG.debug("{}: history {} already exists", persistenceId, historyId); + return new LocalHistorySuccess(historyId, request.getSequence()); } // We have not found the history. Before we create it we need to check history ID sequencing so that we do not // end up resurrecting a purged history. - if (purgedHistories.contains(UnsignedLong.fromLongBits(id.getHistoryId()))) { + if (purgedHistories.contains(historyId.getHistoryId())) { LOG.debug("{}: rejecting purged request {}", persistenceId, request); - throw new DeadHistoryException(lastSeenHistory.longValue()); + throw new DeadHistoryException(purgedHistories.toImmutable()); } // Update last history we have seen - if (lastSeenHistory != null && Long.compareUnsigned(lastSeenHistory, id.getHistoryId()) < 0) { - lastSeenHistory = id.getHistoryId(); + if (lastSeenHistory == null || Long.compareUnsigned(lastSeenHistory, historyId.getHistoryId()) < 0) { + lastSeenHistory = historyId.getHistoryId(); } - localHistories.put(id, LocalFrontendHistory.create(persistenceId, tree, id)); - LOG.debug("{}: created history {}", persistenceId, id); - return new LocalHistorySuccess(id, request.getSequence()); + // We have to send the response only after persistence has completed + final ShardDataTreeTransactionChain chain = tree.ensureTransactionChain(historyId, () -> { + LOG.debug("{}: persisted history {}", persistenceId, historyId); + envelope.sendSuccess(new LocalHistorySuccess(historyId, request.getSequence()), tree.readTime() - now); + }); + + localHistories.put(historyId, LocalFrontendHistory.create(persistenceId, tree, chain)); + LOG.debug("{}: created history {}", persistenceId, historyId); + return null; } private LocalHistorySuccess handleDestroyHistory(final DestroyLocalHistoryRequest request, - final RequestEnvelope envelope, final long now) - throws RequestException { + final RequestEnvelope envelope, final long now) { final LocalHistoryIdentifier id = request.getTarget(); final LocalFrontendHistory existing = localHistories.get(id); if (existing == null) { @@ -162,7 +171,7 @@ final class LeaderFrontendState implements Identifiable { } private LocalHistorySuccess handlePurgeHistory(final PurgeLocalHistoryRequest request, - final RequestEnvelope envelope, final long now) throws RequestException { + final RequestEnvelope envelope, final long now) { final LocalHistoryIdentifier id = request.getTarget(); final LocalFrontendHistory existing = localHistories.remove(id); if (existing == null) { @@ -171,7 +180,7 @@ final class LeaderFrontendState implements Identifiable { } LOG.debug("{}: purging history {}", persistenceId, id); - purgedHistories.add(Range.singleton(UnsignedLong.fromLongBits(id.getHistoryId()))); + purgedHistories.add(id.getHistoryId()); existing.purge(request.getSequence(), envelope, now); return null; } @@ -187,7 +196,12 @@ final class LeaderFrontendState implements Identifiable { if (lhId.getHistoryId() != 0) { history = localHistories.get(lhId); if (history == null) { - LOG.debug("{}: rejecting unknown history request {}", persistenceId, request); + if (purgedHistories.contains(lhId.getHistoryId())) { + LOG.warn("{}: rejecting request {} to purged history", persistenceId, request); + throw new DeadHistoryException(purgedHistories.toImmutable()); + } + + LOG.warn("{}: rejecting unknown history request {}", persistenceId, request); throw new UnknownHistoryException(lastSeenHistory); } } else { @@ -202,15 +216,49 @@ final class LeaderFrontendState implements Identifiable { void reconnect() { expectedTxSequence = 0; + lastConnectTicks = tree.readTime(); } void retire() { - // FIXME: flush all state + // Hunt down any transactions associated with this frontend + final Iterator it = tree.cohortIterator(); + while (it.hasNext()) { + final SimpleShardDataTreeCohort cohort = it.next(); + if (clientId.equals(cohort.getIdentifier().getHistoryId().getClientId())) { + if (cohort.getState() != State.COMMIT_PENDING) { + LOG.debug("{}: Retiring transaction {}", persistenceId, cohort.getIdentifier()); + it.remove(); + } else { + LOG.debug("{}: Transaction {} already committing, not retiring it", persistenceId, + cohort.getIdentifier()); + } + } + } + + // Clear out all transaction chains + localHistories.values().forEach(AbstractFrontendHistory::retire); + localHistories.clear(); + standaloneHistory.retire(); + } + + long getLastConnectTicks() { + return lastConnectTicks; + } + + long getLastSeenTicks() { + return lastSeenTicks; + } + + void touch() { + this.lastSeenTicks = tree.readTime(); } @Override public String toString() { - return MoreObjects.toStringHelper(LeaderFrontendState.class).add("clientId", clientId) - .add("purgedHistories", purgedHistories).toString(); + return MoreObjects.toStringHelper(LeaderFrontendState.class) + .add("clientId", clientId) + .add("nanosAgo", tree.readTime() - lastSeenTicks) + .add("purgedHistories", purgedHistories) + .toString(); } }