X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FLeaderFrontendState.java;h=295dbdc0058a129193bb83464f3b40a9b9dc852e;hb=546cd1fd100dbaa36908b22c2f422320dbd8c4b2;hp=297759b5c86ffd798b0612ce32e96fe6f7e6af25;hpb=cd2a6fa0d8fa6281be28d3c7b9828ecf4e932811;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java index 297759b5c8..295dbdc005 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java @@ -7,22 +7,20 @@ */ package org.opendaylight.controller.cluster.datastore; +import static java.util.Objects.requireNonNull; + import com.google.common.base.MoreObjects; -import com.google.common.base.Preconditions; -import com.google.common.collect.Range; -import com.google.common.collect.RangeSet; -import com.google.common.collect.TreeRangeSet; -import com.google.common.primitives.UnsignedLong; +import com.google.common.base.MoreObjects.ToStringHelper; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; -import javax.annotation.Nullable; -import javax.annotation.concurrent.NotThreadSafe; +import org.eclipse.jdt.annotation.Nullable; import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest; import org.opendaylight.controller.cluster.access.commands.DeadHistoryException; import org.opendaylight.controller.cluster.access.commands.DestroyLocalHistoryRequest; import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest; import org.opendaylight.controller.cluster.access.commands.LocalHistorySuccess; -import org.opendaylight.controller.cluster.access.commands.OutOfOrderRequestException; +import org.opendaylight.controller.cluster.access.commands.OutOfSequenceEnvelopeException; import org.opendaylight.controller.cluster.access.commands.PurgeLocalHistoryRequest; import org.opendaylight.controller.cluster.access.commands.TransactionRequest; import org.opendaylight.controller.cluster.access.commands.TransactionSuccess; @@ -32,182 +30,293 @@ import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifie import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope; import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.UnsupportedRequestException; +import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort.State; +import org.opendaylight.controller.cluster.datastore.utils.UnsignedLongRangeSet; import org.opendaylight.yangtools.concepts.Identifiable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Frontend state as observed by the shard leader. This class is responsible for tracking generations and sequencing - * in the frontend/backend conversation. + * in the frontend/backend conversation. This class is NOT thread-safe. * * @author Robert Varga */ -@NotThreadSafe -final class LeaderFrontendState implements Identifiable { - private static final Logger LOG = LoggerFactory.getLogger(LeaderFrontendState.class); +abstract class LeaderFrontendState implements Identifiable { + static final class Disabled extends LeaderFrontendState { + Disabled(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree) { + super(persistenceId, clientId, tree); + } - // Histories which have not been purged - private final Map localHistories = new HashMap<>(); + @Override + LocalHistorySuccess handleLocalHistoryRequest(final LocalHistoryRequest request, + final RequestEnvelope envelope, final long now) throws RequestException { + throw new UnsupportedRequestException(request); + } - // RangeSet performs automatic merging, hence we keep minimal state tracking information - private final RangeSet purgedHistories = TreeRangeSet.create(); + @Override + TransactionSuccess handleTransactionRequest(final TransactionRequest request, + final RequestEnvelope envelope, final long now) throws RequestException { + throw new UnsupportedRequestException(request); + } + } - // Used for all standalone transactions - private final AbstractFrontendHistory standaloneHistory; - private final ShardDataTree tree; - private final ClientIdentifier clientId; - private final String persistenceId; + static final class Enabled extends LeaderFrontendState { + // Histories which have not been purged + private final Map localHistories; - private long expectedTxSequence; - private Long lastSeenHistory = null; + // RangeSet performs automatic merging, hence we keep minimal state tracking information + private final UnsignedLongRangeSet purgedHistories; + // Used for all standalone transactions + private final AbstractFrontendHistory standaloneHistory; - // TODO: explicit failover notification - // Record the ActorRef for the originating actor and when we switch to being a leader send a notification - // to the frontend client -- that way it can immediately start sending requests - - // TODO: add statistics: - // - number of requests processed - // - number of histories processed - // - per-RequestException throw counters + private long expectedTxSequence; + private Long lastSeenHistory = null; - LeaderFrontendState(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree) { - this.persistenceId = Preconditions.checkNotNull(persistenceId); - this.clientId = Preconditions.checkNotNull(clientId); - this.tree = Preconditions.checkNotNull(tree); - standaloneHistory = new StandaloneFrontendHistory(persistenceId, tree.ticker(), clientId, tree); - } + Enabled(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree) { + this(persistenceId, clientId, tree, UnsignedLongRangeSet.create(), + StandaloneFrontendHistory.create(persistenceId, clientId, tree), new HashMap<>()); + } - @Override - public ClientIdentifier getIdentifier() { - return clientId; - } + Enabled(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree, + final UnsignedLongRangeSet purgedHistories, final AbstractFrontendHistory standaloneHistory, + final Map localHistories) { + super(persistenceId, clientId, tree); + this.purgedHistories = requireNonNull(purgedHistories); + this.standaloneHistory = requireNonNull(standaloneHistory); + this.localHistories = requireNonNull(localHistories); + } - private void checkRequestSequence(final RequestEnvelope envelope) throws OutOfOrderRequestException { - if (expectedTxSequence != envelope.getTxSequence()) { - throw new OutOfOrderRequestException(expectedTxSequence); + @Override + @Nullable LocalHistorySuccess handleLocalHistoryRequest(final LocalHistoryRequest request, + final RequestEnvelope envelope, final long now) throws RequestException { + checkRequestSequence(envelope); + + try { + if (request instanceof CreateLocalHistoryRequest) { + return handleCreateHistory((CreateLocalHistoryRequest) request, envelope, now); + } else if (request instanceof DestroyLocalHistoryRequest) { + return handleDestroyHistory((DestroyLocalHistoryRequest) request, envelope, now); + } else if (request instanceof PurgeLocalHistoryRequest) { + return handlePurgeHistory((PurgeLocalHistoryRequest)request, envelope, now); + } else { + LOG.warn("{}: rejecting unsupported request {}", persistenceId(), request); + throw new UnsupportedRequestException(request); + } + } finally { + expectNextRequest(); + } } - } - private void expectNextRequest() { - expectedTxSequence++; - } + @Override + @Nullable TransactionSuccess handleTransactionRequest(final TransactionRequest request, + final RequestEnvelope envelope, final long now) throws RequestException { + checkRequestSequence(envelope); + + try { + final LocalHistoryIdentifier lhId = request.getTarget().getHistoryId(); + final AbstractFrontendHistory history; + + if (lhId.getHistoryId() != 0) { + history = localHistories.get(lhId); + if (history == null) { + if (purgedHistories.contains(lhId.getHistoryId())) { + LOG.warn("{}: rejecting request {} to purged history", persistenceId(), request); + throw new DeadHistoryException(purgedHistories.toImmutable()); + } + + LOG.warn("{}: rejecting unknown history request {}", persistenceId(), request); + throw new UnknownHistoryException(lastSeenHistory); + } + } else { + history = standaloneHistory; + } - @Nullable LocalHistorySuccess handleLocalHistoryRequest(final LocalHistoryRequest request, - final RequestEnvelope envelope, final long now) throws RequestException { - checkRequestSequence(envelope); - - try { - if (request instanceof CreateLocalHistoryRequest) { - return handleCreateHistory((CreateLocalHistoryRequest) request); - } else if (request instanceof DestroyLocalHistoryRequest) { - return handleDestroyHistory((DestroyLocalHistoryRequest) request, now); - } else if (request instanceof PurgeLocalHistoryRequest) { - return handlePurgeHistory((PurgeLocalHistoryRequest)request, now); - } else { - throw new UnsupportedRequestException(request); + return history.handleTransactionRequest(request, envelope, now); + } finally { + expectNextRequest(); } - } finally { - expectNextRequest(); } - } - private LocalHistorySuccess handleCreateHistory(final CreateLocalHistoryRequest request) throws RequestException { - final LocalHistoryIdentifier id = request.getTarget(); - final AbstractFrontendHistory existing = localHistories.get(id); - if (existing != null) { - // History already exists: report success - LOG.debug("{}: history {} already exists", persistenceId, id); - return new LocalHistorySuccess(id, request.getSequence()); + @Override + void reconnect() { + expectedTxSequence = 0; + super.reconnect(); } - // We have not found the history. Before we create it we need to check history ID sequencing so that we do not - // end up resurrecting a purged history. - if (purgedHistories.contains(UnsignedLong.fromLongBits(id.getHistoryId()))) { - LOG.debug("{}: rejecting purged request {}", persistenceId, request); - throw new DeadHistoryException(lastSeenHistory.longValue()); + @Override + void retire() { + super.retire(); + + // Clear out all transaction chains + localHistories.values().forEach(AbstractFrontendHistory::retire); + localHistories.clear(); + standaloneHistory.retire(); } - // Update last history we have seen - if (lastSeenHistory != null && Long.compareUnsigned(lastSeenHistory, id.getHistoryId()) < 0) { - lastSeenHistory = id.getHistoryId(); + @Override + ToStringHelper addToStringAttributes(final ToStringHelper helper) { + return super.addToStringAttributes(helper).add("purgedHistories", purgedHistories); } - localHistories.put(id, new LocalFrontendHistory(persistenceId, tree.ticker(), tree.ensureTransactionChain(id))); - LOG.debug("{}: created history {}", persistenceId, id); - return new LocalHistorySuccess(id, request.getSequence()); - } + private LocalHistorySuccess handleCreateHistory(final CreateLocalHistoryRequest request, + final RequestEnvelope envelope, final long now) throws RequestException { + final LocalHistoryIdentifier historyId = request.getTarget(); + final AbstractFrontendHistory existing = localHistories.get(historyId); + if (existing != null) { + // History already exists: report success + LOG.debug("{}: history {} already exists", persistenceId(), historyId); + return new LocalHistorySuccess(historyId, request.getSequence()); + } + + // We have not found the history. Before we create it we need to check history ID sequencing so that we do + // not end up resurrecting a purged history. + if (purgedHistories.contains(historyId.getHistoryId())) { + LOG.debug("{}: rejecting purged request {}", persistenceId(), request); + throw new DeadHistoryException(purgedHistories.toImmutable()); + } + + // Update last history we have seen + if (lastSeenHistory == null || Long.compareUnsigned(lastSeenHistory, historyId.getHistoryId()) < 0) { + lastSeenHistory = historyId.getHistoryId(); + } + + // We have to send the response only after persistence has completed + final ShardDataTreeTransactionChain chain = tree().ensureTransactionChain(historyId, () -> { + LOG.debug("{}: persisted history {}", persistenceId(), historyId); + envelope.sendSuccess(new LocalHistorySuccess(historyId, request.getSequence()), + tree().readTime() - now); + }); - private LocalHistorySuccess handleDestroyHistory(final DestroyLocalHistoryRequest request, final long now) - throws RequestException { - final LocalHistoryIdentifier id = request.getTarget(); - final LocalFrontendHistory existing = localHistories.get(id); - if (existing == null) { - // History does not exist: report success - LOG.debug("{}: history {} does not exist, nothing to destroy", persistenceId, id); - return new LocalHistorySuccess(id, request.getSequence()); + localHistories.put(historyId, LocalFrontendHistory.create(persistenceId(), tree(), chain)); + LOG.debug("{}: created history {}", persistenceId(), historyId); + return null; } - return existing.destroy(request.getSequence(), now); - } + private LocalHistorySuccess handleDestroyHistory(final DestroyLocalHistoryRequest request, + final RequestEnvelope envelope, final long now) { + final LocalHistoryIdentifier id = request.getTarget(); + final LocalFrontendHistory existing = localHistories.get(id); + if (existing == null) { + // History does not exist: report success + LOG.debug("{}: history {} does not exist, nothing to destroy", persistenceId(), id); + return new LocalHistorySuccess(id, request.getSequence()); + } - private LocalHistorySuccess handlePurgeHistory(final PurgeLocalHistoryRequest request, final long now) - throws RequestException { - final LocalHistoryIdentifier id = request.getTarget(); - final LocalFrontendHistory existing = localHistories.remove(id); - if (existing != null) { - purgedHistories.add(Range.singleton(UnsignedLong.fromLongBits(id.getHistoryId()))); + existing.destroy(request.getSequence(), envelope, now); + return null; + } - if (!existing.isDestroyed()) { - LOG.warn("{}: purging undestroyed history {}", persistenceId, id); - existing.destroy(request.getSequence(), now); + private LocalHistorySuccess handlePurgeHistory(final PurgeLocalHistoryRequest request, + final RequestEnvelope envelope, final long now) { + final LocalHistoryIdentifier id = request.getTarget(); + final LocalFrontendHistory existing = localHistories.remove(id); + if (existing == null) { + LOG.debug("{}: history {} has already been purged", persistenceId(), id); + return new LocalHistorySuccess(id, request.getSequence()); } - // FIXME: record a PURGE tombstone in the journal + LOG.debug("{}: purging history {}", persistenceId(), id); + purgedHistories.add(id.getHistoryId()); + existing.purge(request.getSequence(), envelope, now); + return null; + } - LOG.debug("{}: purged history {}", persistenceId, id); - } else { - LOG.debug("{}: history {} has already been purged", persistenceId, id); + private void checkRequestSequence(final RequestEnvelope envelope) throws OutOfSequenceEnvelopeException { + if (expectedTxSequence != envelope.getTxSequence()) { + throw new OutOfSequenceEnvelopeException(expectedTxSequence); + } } - return new LocalHistorySuccess(id, request.getSequence()); + private void expectNextRequest() { + expectedTxSequence++; + } } - @Nullable TransactionSuccess handleTransactionRequest(final TransactionRequest request, - final RequestEnvelope envelope, final long now) throws RequestException { - checkRequestSequence(envelope); + private static final Logger LOG = LoggerFactory.getLogger(LeaderFrontendState.class); - try { - final LocalHistoryIdentifier lhId = request.getTarget().getHistoryId(); - final AbstractFrontendHistory history; + private final ShardDataTree tree; + private final ClientIdentifier clientId; + private final String persistenceId; - if (lhId.getHistoryId() != 0) { - history = localHistories.get(lhId); - if (history == null) { - LOG.debug("{}: rejecting unknown history request {}", persistenceId, request); - throw new UnknownHistoryException(lastSeenHistory); - } - } else { - history = standaloneHistory; - } + private long lastConnectTicks; + private long lastSeenTicks; - return history.handleTransactionRequest(request, envelope, now); - } finally { - expectNextRequest(); - } + // TODO: explicit failover notification + // Record the ActorRef for the originating actor and when we switch to being a leader send a notification + // to the frontend client -- that way it can immediately start sending requests + + // TODO: add statistics: + // - number of requests processed + // - number of histories processed + // - per-RequestException throw counters + + LeaderFrontendState(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree) { + this.persistenceId = requireNonNull(persistenceId); + this.clientId = requireNonNull(clientId); + this.tree = requireNonNull(tree); + this.lastSeenTicks = tree.readTime(); } + @Override + public final ClientIdentifier getIdentifier() { + return clientId; + } + + final String persistenceId() { + return persistenceId; + } + + final long getLastConnectTicks() { + return lastConnectTicks; + } + + final long getLastSeenTicks() { + return lastSeenTicks; + } + + final ShardDataTree tree() { + return tree; + } + + final void touch() { + this.lastSeenTicks = tree.readTime(); + } + + abstract @Nullable LocalHistorySuccess handleLocalHistoryRequest(LocalHistoryRequest request, + RequestEnvelope envelope, long now) throws RequestException; + + abstract @Nullable TransactionSuccess handleTransactionRequest(TransactionRequest request, + RequestEnvelope envelope, long now) throws RequestException; + void reconnect() { - expectedTxSequence = 0; + lastConnectTicks = tree.readTime(); } void retire() { - // FIXME: flush all state + // Hunt down any transactions associated with this frontend + final Iterator it = tree.cohortIterator(); + while (it.hasNext()) { + final SimpleShardDataTreeCohort cohort = it.next(); + if (clientId.equals(cohort.getIdentifier().getHistoryId().getClientId())) { + if (cohort.getState() != State.COMMIT_PENDING) { + LOG.debug("{}: Retiring transaction {}", persistenceId, cohort.getIdentifier()); + it.remove(); + } else { + LOG.debug("{}: Transaction {} already committing, not retiring it", persistenceId, + cohort.getIdentifier()); + } + } + } } @Override - public String toString() { - return MoreObjects.toStringHelper(LeaderFrontendState.class).add("clientId", clientId) - .add("purgedHistories", purgedHistories).toString(); + public final String toString() { + return addToStringAttributes(MoreObjects.toStringHelper(this)).toString(); + } + + ToStringHelper addToStringAttributes(final ToStringHelper helper) { + return helper.add("clientId", clientId).add("nanosAgo", tree.readTime() - lastSeenTicks); } }