X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FLeaderFrontendState.java;h=5a5e42637e6a5a4908a23dd85df8abd88589967b;hb=refs%2Fchanges%2F36%2F60436%2F1;hp=3c65b799d1831c498c6321df878305fb2c56a737;hpb=5fd8e6506248cc34da72281a1662612f6c2b2f9a;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java index 3c65b799d1..5a5e42637e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java @@ -9,11 +9,8 @@ package org.opendaylight.controller.cluster.datastore; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; -import com.google.common.collect.Range; -import com.google.common.collect.RangeSet; -import com.google.common.collect.TreeRangeSet; -import com.google.common.primitives.UnsignedLong; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; @@ -22,7 +19,7 @@ import org.opendaylight.controller.cluster.access.commands.DeadHistoryException; import org.opendaylight.controller.cluster.access.commands.DestroyLocalHistoryRequest; import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest; import org.opendaylight.controller.cluster.access.commands.LocalHistorySuccess; -import org.opendaylight.controller.cluster.access.commands.OutOfOrderRequestException; +import org.opendaylight.controller.cluster.access.commands.OutOfSequenceEnvelopeException; import org.opendaylight.controller.cluster.access.commands.PurgeLocalHistoryRequest; import org.opendaylight.controller.cluster.access.commands.TransactionRequest; import org.opendaylight.controller.cluster.access.commands.TransactionSuccess; @@ -32,6 +29,8 @@ import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifie import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope; import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.UnsupportedRequestException; +import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort.State; +import org.opendaylight.controller.cluster.datastore.utils.UnsignedLongRangeSet; import org.opendaylight.yangtools.concepts.Identifiable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,10 +46,10 @@ final class LeaderFrontendState implements Identifiable { private static final Logger LOG = LoggerFactory.getLogger(LeaderFrontendState.class); // Histories which have not been purged - private final Map localHistories = new HashMap<>(); + private final Map localHistories; // RangeSet performs automatic merging, hence we keep minimal state tracking information - private final RangeSet purgedHistories = TreeRangeSet.create(); + private final UnsignedLongRangeSet purgedHistories; // Used for all standalone transactions private final AbstractFrontendHistory standaloneHistory; @@ -61,7 +60,6 @@ final class LeaderFrontendState implements Identifiable { private long expectedTxSequence; private Long lastSeenHistory = null; - // TODO: explicit failover notification // Record the ActorRef for the originating actor and when we switch to being a leader send a notification // to the frontend client -- that way it can immediately start sending requests @@ -72,10 +70,19 @@ final class LeaderFrontendState implements Identifiable { // - per-RequestException throw counters LeaderFrontendState(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree) { + this(persistenceId, clientId, tree, UnsignedLongRangeSet.create(), + StandaloneFrontendHistory.create(persistenceId, clientId, tree), new HashMap<>()); + } + + LeaderFrontendState(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree, + final UnsignedLongRangeSet purgedHistories, final AbstractFrontendHistory standaloneHistory, + final Map localHistories) { this.persistenceId = Preconditions.checkNotNull(persistenceId); this.clientId = Preconditions.checkNotNull(clientId); this.tree = Preconditions.checkNotNull(tree); - standaloneHistory = new StandaloneFrontendHistory(persistenceId, clientId, tree); + this.purgedHistories = Preconditions.checkNotNull(purgedHistories); + this.standaloneHistory = Preconditions.checkNotNull(standaloneHistory); + this.localHistories = Preconditions.checkNotNull(localHistories); } @Override @@ -83,9 +90,9 @@ final class LeaderFrontendState implements Identifiable { return clientId; } - private void checkRequestSequence(final RequestEnvelope envelope) throws OutOfOrderRequestException { + private void checkRequestSequence(final RequestEnvelope envelope) throws OutOfSequenceEnvelopeException { if (expectedTxSequence != envelope.getTxSequence()) { - throw new OutOfOrderRequestException(expectedTxSequence); + throw new OutOfSequenceEnvelopeException(expectedTxSequence); } } @@ -94,17 +101,18 @@ final class LeaderFrontendState implements Identifiable { } @Nullable LocalHistorySuccess handleLocalHistoryRequest(final LocalHistoryRequest request, - final RequestEnvelope envelope) throws RequestException { + final RequestEnvelope envelope, final long now) throws RequestException { checkRequestSequence(envelope); try { if (request instanceof CreateLocalHistoryRequest) { - return handleCreateHistory((CreateLocalHistoryRequest) request); + return handleCreateHistory((CreateLocalHistoryRequest) request, envelope, now); } else if (request instanceof DestroyLocalHistoryRequest) { - return handleDestroyHistory((DestroyLocalHistoryRequest) request); + return handleDestroyHistory((DestroyLocalHistoryRequest) request, envelope, now); } else if (request instanceof PurgeLocalHistoryRequest) { - return handlePurgeHistory((PurgeLocalHistoryRequest)request); + return handlePurgeHistory((PurgeLocalHistoryRequest)request, envelope, now); } else { + LOG.warn("{}: rejecting unsupported request {}", persistenceId, request); throw new UnsupportedRequestException(request); } } finally { @@ -112,33 +120,42 @@ final class LeaderFrontendState implements Identifiable { } } - private LocalHistorySuccess handleCreateHistory(final CreateLocalHistoryRequest request) throws RequestException { - final LocalHistoryIdentifier id = request.getTarget(); - final AbstractFrontendHistory existing = localHistories.get(id); + private LocalHistorySuccess handleCreateHistory(final CreateLocalHistoryRequest request, + final RequestEnvelope envelope, final long now) throws RequestException { + final LocalHistoryIdentifier historyId = request.getTarget(); + final AbstractFrontendHistory existing = localHistories.get(historyId); if (existing != null) { // History already exists: report success - LOG.debug("{}: history {} already exists", persistenceId, id); - return new LocalHistorySuccess(id, request.getSequence()); + LOG.debug("{}: history {} already exists", persistenceId, historyId); + return new LocalHistorySuccess(historyId, request.getSequence()); } // We have not found the history. Before we create it we need to check history ID sequencing so that we do not // end up resurrecting a purged history. - if (purgedHistories.contains(UnsignedLong.fromLongBits(id.getHistoryId()))) { + if (purgedHistories.contains(historyId.getHistoryId())) { LOG.debug("{}: rejecting purged request {}", persistenceId, request); - throw new DeadHistoryException(lastSeenHistory.longValue()); + throw new DeadHistoryException(purgedHistories.toImmutable()); } // Update last history we have seen - if (lastSeenHistory != null && Long.compareUnsigned(lastSeenHistory, id.getHistoryId()) < 0) { - lastSeenHistory = id.getHistoryId(); + if (lastSeenHistory == null || Long.compareUnsigned(lastSeenHistory, historyId.getHistoryId()) < 0) { + lastSeenHistory = historyId.getHistoryId(); } - localHistories.put(id, new LocalFrontendHistory(persistenceId, tree.ensureTransactionChain(id))); - LOG.debug("{}: created history {}", persistenceId, id); - return new LocalHistorySuccess(id, request.getSequence()); + // We have to send the response only after persistence has completed + final ShardDataTreeTransactionChain chain = tree.ensureTransactionChain(historyId, () -> { + LOG.debug("{}: persisted history {}", persistenceId, historyId); + envelope.sendSuccess(new LocalHistorySuccess(historyId, request.getSequence()), tree.readTime() - now); + }); + + localHistories.put(historyId, LocalFrontendHistory.create(persistenceId, tree, chain)); + LOG.debug("{}: created history {}", persistenceId, historyId); + return null; } - private LocalHistorySuccess handleDestroyHistory(final DestroyLocalHistoryRequest request) throws RequestException { + private LocalHistorySuccess handleDestroyHistory(final DestroyLocalHistoryRequest request, + final RequestEnvelope envelope, final long now) + throws RequestException { final LocalHistoryIdentifier id = request.getTarget(); final LocalFrontendHistory existing = localHistories.get(id); if (existing == null) { @@ -147,32 +164,27 @@ final class LeaderFrontendState implements Identifiable { return new LocalHistorySuccess(id, request.getSequence()); } - return existing.destroy(request.getSequence()); + existing.destroy(request.getSequence(), envelope, now); + return null; } - private LocalHistorySuccess handlePurgeHistory(final PurgeLocalHistoryRequest request) throws RequestException { + private LocalHistorySuccess handlePurgeHistory(final PurgeLocalHistoryRequest request, + final RequestEnvelope envelope, final long now) throws RequestException { final LocalHistoryIdentifier id = request.getTarget(); final LocalFrontendHistory existing = localHistories.remove(id); - if (existing != null) { - purgedHistories.add(Range.singleton(UnsignedLong.fromLongBits(id.getHistoryId()))); - - if (!existing.isDestroyed()) { - LOG.warn("{}: purging undestroyed history {}", persistenceId, id); - existing.destroy(request.getSequence()); - } - - // FIXME: record a PURGE tombstone in the journal - - LOG.debug("{}: purged history {}", persistenceId, id); - } else { + if (existing == null) { LOG.debug("{}: history {} has already been purged", persistenceId, id); + return new LocalHistorySuccess(id, request.getSequence()); } - return new LocalHistorySuccess(id, request.getSequence()); + LOG.debug("{}: purging history {}", persistenceId, id); + purgedHistories.add(id.getHistoryId()); + existing.purge(request.getSequence(), envelope, now); + return null; } @Nullable TransactionSuccess handleTransactionRequest(final TransactionRequest request, - final RequestEnvelope envelope) throws RequestException { + final RequestEnvelope envelope, final long now) throws RequestException { checkRequestSequence(envelope); try { @@ -182,14 +194,19 @@ final class LeaderFrontendState implements Identifiable { if (lhId.getHistoryId() != 0) { history = localHistories.get(lhId); if (history == null) { - LOG.debug("{}: rejecting unknown history request {}", persistenceId, request); + if (purgedHistories.contains(lhId.getHistoryId())) { + LOG.warn("{}: rejecting request {} to purged history", persistenceId, request); + throw new DeadHistoryException(purgedHistories.toImmutable()); + } + + LOG.warn("{}: rejecting unknown history request {}", persistenceId, request); throw new UnknownHistoryException(lastSeenHistory); } } else { history = standaloneHistory; } - return history.handleTransactionRequest(request, envelope); + return history.handleTransactionRequest(request, envelope, now); } finally { expectNextRequest(); } @@ -200,7 +217,25 @@ final class LeaderFrontendState implements Identifiable { } void retire() { - // FIXME: flush all state + // Hunt down any transactions associated with this frontend + final Iterator it = tree.cohortIterator(); + while (it.hasNext()) { + final SimpleShardDataTreeCohort cohort = it.next(); + if (clientId.equals(cohort.getIdentifier().getHistoryId().getClientId())) { + if (cohort.getState() != State.COMMIT_PENDING) { + LOG.debug("{}: Retiring transaction {}", persistenceId, cohort.getIdentifier()); + it.remove(); + } else { + LOG.debug("{}: Transaction {} already committing, not retiring it", persistenceId, + cohort.getIdentifier()); + } + } + } + + // Clear out all transaction chains + localHistories.values().forEach(AbstractFrontendHistory::retire); + localHistories.clear(); + standaloneHistory.retire(); } @Override