X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FLeaderFrontendState.java;h=295dbdc0058a129193bb83464f3b40a9b9dc852e;hp=8704f2ab0cd9d33e02ab6b53a65d1aa5733fc60b;hb=ccca30bbb1545643c427fc59c23329c5d49f8d4b;hpb=b6904860c1fff9e5d05767ef5b41e103b26872c0 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java index 8704f2ab0c..295dbdc005 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java @@ -7,16 +7,14 @@ */ package org.opendaylight.controller.cluster.datastore; +import static java.util.Objects.requireNonNull; + import com.google.common.base.MoreObjects; -import com.google.common.base.Preconditions; -import com.google.common.collect.Range; -import com.google.common.collect.RangeSet; -import com.google.common.collect.TreeRangeSet; -import com.google.common.primitives.UnsignedLong; +import com.google.common.base.MoreObjects.ToStringHelper; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; -import javax.annotation.Nullable; -import javax.annotation.concurrent.NotThreadSafe; +import org.eclipse.jdt.annotation.Nullable; import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest; import org.opendaylight.controller.cluster.access.commands.DeadHistoryException; import org.opendaylight.controller.cluster.access.commands.DestroyLocalHistoryRequest; @@ -32,34 +30,218 @@ import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifie import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope; import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.UnsupportedRequestException; +import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort.State; +import org.opendaylight.controller.cluster.datastore.utils.UnsignedLongRangeSet; import org.opendaylight.yangtools.concepts.Identifiable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Frontend state as observed by the shard leader. This class is responsible for tracking generations and sequencing - * in the frontend/backend conversation. + * in the frontend/backend conversation. This class is NOT thread-safe. * * @author Robert Varga */ -@NotThreadSafe -final class LeaderFrontendState implements Identifiable { - private static final Logger LOG = LoggerFactory.getLogger(LeaderFrontendState.class); +abstract class LeaderFrontendState implements Identifiable { + static final class Disabled extends LeaderFrontendState { + Disabled(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree) { + super(persistenceId, clientId, tree); + } + + @Override + LocalHistorySuccess handleLocalHistoryRequest(final LocalHistoryRequest request, + final RequestEnvelope envelope, final long now) throws RequestException { + throw new UnsupportedRequestException(request); + } + + @Override + TransactionSuccess handleTransactionRequest(final TransactionRequest request, + final RequestEnvelope envelope, final long now) throws RequestException { + throw new UnsupportedRequestException(request); + } + } + + static final class Enabled extends LeaderFrontendState { + // Histories which have not been purged + private final Map localHistories; + + // RangeSet performs automatic merging, hence we keep minimal state tracking information + private final UnsignedLongRangeSet purgedHistories; + + // Used for all standalone transactions + private final AbstractFrontendHistory standaloneHistory; + + private long expectedTxSequence; + private Long lastSeenHistory = null; + + Enabled(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree) { + this(persistenceId, clientId, tree, UnsignedLongRangeSet.create(), + StandaloneFrontendHistory.create(persistenceId, clientId, tree), new HashMap<>()); + } + + Enabled(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree, + final UnsignedLongRangeSet purgedHistories, final AbstractFrontendHistory standaloneHistory, + final Map localHistories) { + super(persistenceId, clientId, tree); + this.purgedHistories = requireNonNull(purgedHistories); + this.standaloneHistory = requireNonNull(standaloneHistory); + this.localHistories = requireNonNull(localHistories); + } + + @Override + @Nullable LocalHistorySuccess handleLocalHistoryRequest(final LocalHistoryRequest request, + final RequestEnvelope envelope, final long now) throws RequestException { + checkRequestSequence(envelope); + + try { + if (request instanceof CreateLocalHistoryRequest) { + return handleCreateHistory((CreateLocalHistoryRequest) request, envelope, now); + } else if (request instanceof DestroyLocalHistoryRequest) { + return handleDestroyHistory((DestroyLocalHistoryRequest) request, envelope, now); + } else if (request instanceof PurgeLocalHistoryRequest) { + return handlePurgeHistory((PurgeLocalHistoryRequest)request, envelope, now); + } else { + LOG.warn("{}: rejecting unsupported request {}", persistenceId(), request); + throw new UnsupportedRequestException(request); + } + } finally { + expectNextRequest(); + } + } + + @Override + @Nullable TransactionSuccess handleTransactionRequest(final TransactionRequest request, + final RequestEnvelope envelope, final long now) throws RequestException { + checkRequestSequence(envelope); + + try { + final LocalHistoryIdentifier lhId = request.getTarget().getHistoryId(); + final AbstractFrontendHistory history; + + if (lhId.getHistoryId() != 0) { + history = localHistories.get(lhId); + if (history == null) { + if (purgedHistories.contains(lhId.getHistoryId())) { + LOG.warn("{}: rejecting request {} to purged history", persistenceId(), request); + throw new DeadHistoryException(purgedHistories.toImmutable()); + } + + LOG.warn("{}: rejecting unknown history request {}", persistenceId(), request); + throw new UnknownHistoryException(lastSeenHistory); + } + } else { + history = standaloneHistory; + } + + return history.handleTransactionRequest(request, envelope, now); + } finally { + expectNextRequest(); + } + } + + @Override + void reconnect() { + expectedTxSequence = 0; + super.reconnect(); + } + + @Override + void retire() { + super.retire(); + + // Clear out all transaction chains + localHistories.values().forEach(AbstractFrontendHistory::retire); + localHistories.clear(); + standaloneHistory.retire(); + } + + @Override + ToStringHelper addToStringAttributes(final ToStringHelper helper) { + return super.addToStringAttributes(helper).add("purgedHistories", purgedHistories); + } + + private LocalHistorySuccess handleCreateHistory(final CreateLocalHistoryRequest request, + final RequestEnvelope envelope, final long now) throws RequestException { + final LocalHistoryIdentifier historyId = request.getTarget(); + final AbstractFrontendHistory existing = localHistories.get(historyId); + if (existing != null) { + // History already exists: report success + LOG.debug("{}: history {} already exists", persistenceId(), historyId); + return new LocalHistorySuccess(historyId, request.getSequence()); + } + + // We have not found the history. Before we create it we need to check history ID sequencing so that we do + // not end up resurrecting a purged history. + if (purgedHistories.contains(historyId.getHistoryId())) { + LOG.debug("{}: rejecting purged request {}", persistenceId(), request); + throw new DeadHistoryException(purgedHistories.toImmutable()); + } + + // Update last history we have seen + if (lastSeenHistory == null || Long.compareUnsigned(lastSeenHistory, historyId.getHistoryId()) < 0) { + lastSeenHistory = historyId.getHistoryId(); + } + + // We have to send the response only after persistence has completed + final ShardDataTreeTransactionChain chain = tree().ensureTransactionChain(historyId, () -> { + LOG.debug("{}: persisted history {}", persistenceId(), historyId); + envelope.sendSuccess(new LocalHistorySuccess(historyId, request.getSequence()), + tree().readTime() - now); + }); - // Histories which have not been purged - private final Map localHistories; + localHistories.put(historyId, LocalFrontendHistory.create(persistenceId(), tree(), chain)); + LOG.debug("{}: created history {}", persistenceId(), historyId); + return null; + } + + private LocalHistorySuccess handleDestroyHistory(final DestroyLocalHistoryRequest request, + final RequestEnvelope envelope, final long now) { + final LocalHistoryIdentifier id = request.getTarget(); + final LocalFrontendHistory existing = localHistories.get(id); + if (existing == null) { + // History does not exist: report success + LOG.debug("{}: history {} does not exist, nothing to destroy", persistenceId(), id); + return new LocalHistorySuccess(id, request.getSequence()); + } + + existing.destroy(request.getSequence(), envelope, now); + return null; + } + + private LocalHistorySuccess handlePurgeHistory(final PurgeLocalHistoryRequest request, + final RequestEnvelope envelope, final long now) { + final LocalHistoryIdentifier id = request.getTarget(); + final LocalFrontendHistory existing = localHistories.remove(id); + if (existing == null) { + LOG.debug("{}: history {} has already been purged", persistenceId(), id); + return new LocalHistorySuccess(id, request.getSequence()); + } + + LOG.debug("{}: purging history {}", persistenceId(), id); + purgedHistories.add(id.getHistoryId()); + existing.purge(request.getSequence(), envelope, now); + return null; + } + + private void checkRequestSequence(final RequestEnvelope envelope) throws OutOfSequenceEnvelopeException { + if (expectedTxSequence != envelope.getTxSequence()) { + throw new OutOfSequenceEnvelopeException(expectedTxSequence); + } + } + + private void expectNextRequest() { + expectedTxSequence++; + } + } - // RangeSet performs automatic merging, hence we keep minimal state tracking information - private final RangeSet purgedHistories; + private static final Logger LOG = LoggerFactory.getLogger(LeaderFrontendState.class); - // Used for all standalone transactions - private final AbstractFrontendHistory standaloneHistory; private final ShardDataTree tree; private final ClientIdentifier clientId; private final String persistenceId; - private long expectedTxSequence; - private Long lastSeenHistory = null; + private long lastConnectTicks; + private long lastSeenTicks; // TODO: explicit failover notification // Record the ActorRef for the originating actor and when we switch to being a leader send a notification @@ -71,152 +253,70 @@ final class LeaderFrontendState implements Identifiable { // - per-RequestException throw counters LeaderFrontendState(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree) { - this(persistenceId, clientId, tree, TreeRangeSet.create(), StandaloneFrontendHistory.create(persistenceId, - clientId, tree), new HashMap<>()); - } - - LeaderFrontendState(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree, - final RangeSet purgedHistories, final AbstractFrontendHistory standaloneHistory, - final Map localHistories) { - this.persistenceId = Preconditions.checkNotNull(persistenceId); - this.clientId = Preconditions.checkNotNull(clientId); - this.tree = Preconditions.checkNotNull(tree); - this.purgedHistories = Preconditions.checkNotNull(purgedHistories); - this.standaloneHistory = Preconditions.checkNotNull(standaloneHistory); - this.localHistories = Preconditions.checkNotNull(localHistories); + this.persistenceId = requireNonNull(persistenceId); + this.clientId = requireNonNull(clientId); + this.tree = requireNonNull(tree); + this.lastSeenTicks = tree.readTime(); } @Override - public ClientIdentifier getIdentifier() { + public final ClientIdentifier getIdentifier() { return clientId; } - private void checkRequestSequence(final RequestEnvelope envelope) throws OutOfSequenceEnvelopeException { - if (expectedTxSequence != envelope.getTxSequence()) { - throw new OutOfSequenceEnvelopeException(expectedTxSequence); - } + final String persistenceId() { + return persistenceId; } - private void expectNextRequest() { - expectedTxSequence++; - } - - @Nullable LocalHistorySuccess handleLocalHistoryRequest(final LocalHistoryRequest request, - final RequestEnvelope envelope, final long now) throws RequestException { - checkRequestSequence(envelope); - - try { - if (request instanceof CreateLocalHistoryRequest) { - return handleCreateHistory((CreateLocalHistoryRequest) request); - } else if (request instanceof DestroyLocalHistoryRequest) { - return handleDestroyHistory((DestroyLocalHistoryRequest) request, envelope, now); - } else if (request instanceof PurgeLocalHistoryRequest) { - return handlePurgeHistory((PurgeLocalHistoryRequest)request, envelope, now); - } else { - LOG.warn("{}: rejecting unsupported request {}", persistenceId, request); - throw new UnsupportedRequestException(request); - } - } finally { - expectNextRequest(); - } + final long getLastConnectTicks() { + return lastConnectTicks; } - private LocalHistorySuccess handleCreateHistory(final CreateLocalHistoryRequest request) throws RequestException { - final LocalHistoryIdentifier id = request.getTarget(); - final AbstractFrontendHistory existing = localHistories.get(id); - if (existing != null) { - // History already exists: report success - LOG.debug("{}: history {} already exists", persistenceId, id); - return new LocalHistorySuccess(id, request.getSequence()); - } - - // We have not found the history. Before we create it we need to check history ID sequencing so that we do not - // end up resurrecting a purged history. - if (purgedHistories.contains(UnsignedLong.fromLongBits(id.getHistoryId()))) { - LOG.debug("{}: rejecting purged request {}", persistenceId, request); - throw new DeadHistoryException(purgedHistories); - } - - // Update last history we have seen - if (lastSeenHistory == null || Long.compareUnsigned(lastSeenHistory, id.getHistoryId()) < 0) { - lastSeenHistory = id.getHistoryId(); - } - - localHistories.put(id, LocalFrontendHistory.create(persistenceId, tree, id)); - LOG.debug("{}: created history {}", persistenceId, id); - return new LocalHistorySuccess(id, request.getSequence()); + final long getLastSeenTicks() { + return lastSeenTicks; } - private LocalHistorySuccess handleDestroyHistory(final DestroyLocalHistoryRequest request, - final RequestEnvelope envelope, final long now) - throws RequestException { - final LocalHistoryIdentifier id = request.getTarget(); - final LocalFrontendHistory existing = localHistories.get(id); - if (existing == null) { - // History does not exist: report success - LOG.debug("{}: history {} does not exist, nothing to destroy", persistenceId, id); - return new LocalHistorySuccess(id, request.getSequence()); - } - - existing.destroy(request.getSequence(), envelope, now); - return null; + final ShardDataTree tree() { + return tree; } - private LocalHistorySuccess handlePurgeHistory(final PurgeLocalHistoryRequest request, - final RequestEnvelope envelope, final long now) throws RequestException { - final LocalHistoryIdentifier id = request.getTarget(); - final LocalFrontendHistory existing = localHistories.remove(id); - if (existing == null) { - LOG.debug("{}: history {} has already been purged", persistenceId, id); - return new LocalHistorySuccess(id, request.getSequence()); - } - - LOG.debug("{}: purging history {}", persistenceId, id); - purgedHistories.add(Range.singleton(UnsignedLong.fromLongBits(id.getHistoryId()))); - existing.purge(request.getSequence(), envelope, now); - return null; + final void touch() { + this.lastSeenTicks = tree.readTime(); } - @Nullable TransactionSuccess handleTransactionRequest(final TransactionRequest request, - final RequestEnvelope envelope, final long now) throws RequestException { - checkRequestSequence(envelope); + abstract @Nullable LocalHistorySuccess handleLocalHistoryRequest(LocalHistoryRequest request, + RequestEnvelope envelope, long now) throws RequestException; - try { - final LocalHistoryIdentifier lhId = request.getTarget().getHistoryId(); - final AbstractFrontendHistory history; + abstract @Nullable TransactionSuccess handleTransactionRequest(TransactionRequest request, + RequestEnvelope envelope, long now) throws RequestException; - if (lhId.getHistoryId() != 0) { - history = localHistories.get(lhId); - if (history == null) { - if (purgedHistories.contains(UnsignedLong.fromLongBits(lhId.getHistoryId()))) { - LOG.warn("{}: rejecting request {} to purged history", persistenceId, request); - throw new DeadHistoryException(purgedHistories); - } + void reconnect() { + lastConnectTicks = tree.readTime(); + } - LOG.warn("{}: rejecting unknown history request {}", persistenceId, request); - throw new UnknownHistoryException(lastSeenHistory); + void retire() { + // Hunt down any transactions associated with this frontend + final Iterator it = tree.cohortIterator(); + while (it.hasNext()) { + final SimpleShardDataTreeCohort cohort = it.next(); + if (clientId.equals(cohort.getIdentifier().getHistoryId().getClientId())) { + if (cohort.getState() != State.COMMIT_PENDING) { + LOG.debug("{}: Retiring transaction {}", persistenceId, cohort.getIdentifier()); + it.remove(); + } else { + LOG.debug("{}: Transaction {} already committing, not retiring it", persistenceId, + cohort.getIdentifier()); } - } else { - history = standaloneHistory; } - - return history.handleTransactionRequest(request, envelope, now); - } finally { - expectNextRequest(); } } - void reconnect() { - expectedTxSequence = 0; - } - - void retire() { - // FIXME: flush all state + @Override + public final String toString() { + return addToStringAttributes(MoreObjects.toStringHelper(this)).toString(); } - @Override - public String toString() { - return MoreObjects.toStringHelper(LeaderFrontendState.class).add("clientId", clientId) - .add("purgedHistories", purgedHistories).toString(); + ToStringHelper addToStringAttributes(final ToStringHelper helper) { + return helper.add("clientId", clientId).add("nanosAgo", tree.readTime() - lastSeenTicks); } }