From 33ade248cf6070455349fe343c0d0fd48d274717 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Fri, 10 May 2019 12:25:31 +0200 Subject: [PATCH 1/1] Allow transaction tracking to be disabled Ask-based protocol does not need tracking of transactions and histories, as it is not retransmitting requests. It also does not inform backend about purely-local aborted transactions (read-write and read-only), which leads to transaction tracking rangesets having holes where those IDs are used. This adds the prerequisite handling of disabling from the leader without adding the actual mechanics. JIRA: CONTROLLER-1879 Change-Id: I133e7688b492336937f394f0f6c3f080a05a820f Signed-off-by: Robert Varga Signed-off-by: Tomas Cere --- .../FrontendClientMetadataBuilder.java | 362 ++++++++++------- .../cluster/datastore/FrontendMetadata.java | 25 +- .../datastore/LeaderFrontendState.java | 365 ++++++++++-------- .../controller/cluster/datastore/Shard.java | 36 +- .../persisted/DisableTrackingPayload.java | 65 ++++ 5 files changed, 568 insertions(+), 285 deletions(-) create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/DisableTrackingPayload.java diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendClientMetadataBuilder.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendClientMetadataBuilder.java index fbaf76fbc5..7e6eced779 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendClientMetadataBuilder.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendClientMetadataBuilder.java @@ -11,7 +11,13 @@ import static com.google.common.base.Verify.verify; import static java.util.Objects.requireNonNull; import com.google.common.base.MoreObjects; +import com.google.common.base.MoreObjects.ToStringHelper; import com.google.common.collect.Collections2; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableRangeSet; +import com.google.common.collect.RangeSet; +import com.google.common.primitives.UnsignedLong; +import java.util.Collection; import java.util.HashMap; import java.util.Map; import org.eclipse.jdt.annotation.NonNull; @@ -29,173 +35,271 @@ import org.slf4j.LoggerFactory; /** * This class is NOT thread-safe. */ -final class FrontendClientMetadataBuilder implements Builder, Identifiable { - private static final Logger LOG = LoggerFactory.getLogger(FrontendClientMetadataBuilder.class); +abstract class FrontendClientMetadataBuilder implements Builder, + Identifiable { + static final class Disabled extends FrontendClientMetadataBuilder { + Disabled(final String shardName, final ClientIdentifier identifier) { + super(shardName, identifier); + } - private final Map currentHistories = new HashMap<>(); - private final UnsignedLongRangeSet purgedHistories; - private final LocalHistoryIdentifier standaloneId; - private final ClientIdentifier identifier; - private final String shardName; + @Override + public FrontendClientMetadata build() { + return new FrontendClientMetadata(getIdentifier(), ImmutableRangeSet.of(), ImmutableList.of()); + } - FrontendClientMetadataBuilder(final String shardName, final ClientIdentifier identifier) { - this.shardName = requireNonNull(shardName); - this.identifier = requireNonNull(identifier); - purgedHistories = UnsignedLongRangeSet.create(); + @Override + void onHistoryCreated(final LocalHistoryIdentifier historyId) { + // No-op + } - // History for stand-alone transactions is always present - standaloneId = standaloneHistoryId(); - currentHistories.put(standaloneId, new FrontendHistoryMetadataBuilder(standaloneId)); - } + @Override + void onHistoryClosed(final LocalHistoryIdentifier historyId) { + // No-op + } - FrontendClientMetadataBuilder(final String shardName, final FrontendClientMetadata meta) { - this.shardName = requireNonNull(shardName); - this.identifier = meta.getIdentifier(); - purgedHistories = UnsignedLongRangeSet.create(meta.getPurgedHistories()); + @Override + void onHistoryPurged(final LocalHistoryIdentifier historyId) { + // No-op + } - for (FrontendHistoryMetadata h : meta.getCurrentHistories()) { - final FrontendHistoryMetadataBuilder b = new FrontendHistoryMetadataBuilder(identifier, h); - currentHistories.put(b.getIdentifier(), b); + @Override + void onTransactionAborted(final TransactionIdentifier txId) { + // No-op } - // Sanity check and recovery - standaloneId = standaloneHistoryId(); - if (!currentHistories.containsKey(standaloneId)) { - LOG.warn("{}: Client {} recovered histories {} do not contain stand-alone history, attempting recovery", - shardName, identifier, currentHistories); - currentHistories.put(standaloneId, new FrontendHistoryMetadataBuilder(standaloneId)); + @Override + void onTransactionCommitted(final TransactionIdentifier txId) { + // No-op } - } - private LocalHistoryIdentifier standaloneHistoryId() { - return new LocalHistoryIdentifier(identifier, 0); - } + @Override + void onTransactionPurged(final TransactionIdentifier txId) { + // No-op + } - @Override - public FrontendClientMetadata build() { - return new FrontendClientMetadata(identifier, purgedHistories.toImmutable(), - Collections2.transform(currentHistories.values(), FrontendHistoryMetadataBuilder::build)); + @Override + LeaderFrontendState toLeaderState(final Shard shard) { + return new LeaderFrontendState.Disabled(shard.persistenceId(), getIdentifier(), shard.getDataStore()); + } } - @Override - public ClientIdentifier getIdentifier() { - return identifier; - } + static final class Enabled extends FrontendClientMetadataBuilder { + + private final Map currentHistories = new HashMap<>(); + private final UnsignedLongRangeSet purgedHistories; + private final LocalHistoryIdentifier standaloneId; + + Enabled(final String shardName, final ClientIdentifier identifier) { + super(shardName, identifier); + + purgedHistories = UnsignedLongRangeSet.create(); - void onHistoryCreated(final LocalHistoryIdentifier historyId) { - final FrontendHistoryMetadataBuilder newMeta = new FrontendHistoryMetadataBuilder(historyId); - final FrontendHistoryMetadataBuilder oldMeta = currentHistories.putIfAbsent(historyId, newMeta); - if (oldMeta != null) { - // This should not be happening, warn about it - LOG.warn("{}: Reused local history {}", shardName, historyId); - } else { - LOG.debug("{}: Created local history {}", shardName, historyId); + // History for stand-alone transactions is always present + standaloneId = standaloneHistoryId(); + currentHistories.put(standaloneId, new FrontendHistoryMetadataBuilder(standaloneId)); } - } - void onHistoryClosed(final LocalHistoryIdentifier historyId) { - final FrontendHistoryMetadataBuilder builder = currentHistories.get(historyId); - if (builder != null) { - builder.onHistoryClosed(); - LOG.debug("{}: Closed history {}", shardName, historyId); - } else { - LOG.warn("{}: Closed unknown history {}, ignoring", shardName, historyId); + Enabled(final String shardName, final FrontendClientMetadata meta) { + super(shardName, meta.getIdentifier()); + + purgedHistories = UnsignedLongRangeSet.create(meta.getPurgedHistories()); + for (FrontendHistoryMetadata h : meta.getCurrentHistories()) { + final FrontendHistoryMetadataBuilder b = new FrontendHistoryMetadataBuilder(getIdentifier(), h); + currentHistories.put(b.getIdentifier(), b); + } + + // Sanity check and recovery + standaloneId = standaloneHistoryId(); + if (!currentHistories.containsKey(standaloneId)) { + LOG.warn("{}: Client {} recovered histories {} do not contain stand-alone history, attempting recovery", + shardName, getIdentifier(), currentHistories); + currentHistories.put(standaloneId, new FrontendHistoryMetadataBuilder(standaloneId)); + } + } + + @Override + public FrontendClientMetadata build() { + return new FrontendClientMetadata(getIdentifier(), purgedHistories.toImmutable(), + Collections2.transform(currentHistories.values(), FrontendHistoryMetadataBuilder::build)); + } + + @Override + void onHistoryCreated(final LocalHistoryIdentifier historyId) { + final FrontendHistoryMetadataBuilder newMeta = new FrontendHistoryMetadataBuilder(historyId); + final FrontendHistoryMetadataBuilder oldMeta = currentHistories.putIfAbsent(historyId, newMeta); + if (oldMeta != null) { + // This should not be happening, warn about it + LOG.warn("{}: Reused local history {}", shardName(), historyId); + } else { + LOG.debug("{}: Created local history {}", shardName(), historyId); + } } - } - void onHistoryPurged(final LocalHistoryIdentifier historyId) { - final FrontendHistoryMetadataBuilder history = currentHistories.remove(historyId); - final long historyBits = historyId.getHistoryId(); - if (history == null) { - if (!purgedHistories.contains(historyBits)) { + @Override + void onHistoryClosed(final LocalHistoryIdentifier historyId) { + final FrontendHistoryMetadataBuilder builder = currentHistories.get(historyId); + if (builder != null) { + builder.onHistoryClosed(); + LOG.debug("{}: Closed history {}", shardName(), historyId); + } else { + LOG.warn("{}: Closed unknown history {}, ignoring", shardName(), historyId); + } + } + + @Override + void onHistoryPurged(final LocalHistoryIdentifier historyId) { + final FrontendHistoryMetadataBuilder history = currentHistories.remove(historyId); + final long historyBits = historyId.getHistoryId(); + if (history == null) { + if (!purgedHistories.contains(historyBits)) { + purgedHistories.add(historyBits); + LOG.warn("{}: Purging unknown history {}", shardName(), historyId); + } else { + LOG.warn("{}: Duplicate purge of history {}", shardName(), historyId); + } + } else { purgedHistories.add(historyBits); - LOG.warn("{}: Purging unknown history {}", shardName, historyId); + LOG.debug("{}: Purged history {}", shardName(), historyId); + } + } + + @Override + void onTransactionAborted(final TransactionIdentifier txId) { + final FrontendHistoryMetadataBuilder history = getHistory(txId); + if (history != null) { + history.onTransactionAborted(txId); + LOG.debug("{}: Aborted transaction {}", shardName(), txId); } else { - LOG.warn("{}: Duplicate purge of history {}", shardName, historyId); + LOG.warn("{}: Unknown history for aborted transaction {}, ignoring", shardName(), txId); } - } else { - purgedHistories.add(historyBits); - LOG.debug("{}: Purged history {}", shardName, historyId); } - } - void onTransactionAborted(final TransactionIdentifier txId) { - final FrontendHistoryMetadataBuilder history = getHistory(txId); - if (history != null) { - history.onTransactionAborted(txId); - LOG.debug("{}: Aborted transaction {}", shardName, txId); - } else { - LOG.warn("{}: Unknown history for aborted transaction {}, ignoring", shardName, txId); + @Override + void onTransactionCommitted(final TransactionIdentifier txId) { + final FrontendHistoryMetadataBuilder history = getHistory(txId); + if (history != null) { + history.onTransactionCommitted(txId); + LOG.debug("{}: Committed transaction {}", shardName(), txId); + } else { + LOG.warn("{}: Unknown history for commited transaction {}, ignoring", shardName(), txId); + } } - } - void onTransactionCommitted(final TransactionIdentifier txId) { - final FrontendHistoryMetadataBuilder history = getHistory(txId); - if (history != null) { - history.onTransactionCommitted(txId); - LOG.debug("{}: Committed transaction {}", shardName, txId); - } else { - LOG.warn("{}: Unknown history for commited transaction {}, ignoring", shardName, txId); + @Override + void onTransactionPurged(final TransactionIdentifier txId) { + final FrontendHistoryMetadataBuilder history = getHistory(txId); + if (history != null) { + history.onTransactionPurged(txId); + LOG.debug("{}: Purged transaction {}", shardName(), txId); + } else { + LOG.warn("{}: Unknown history for purged transaction {}, ignoring", shardName(), txId); + } } - } - void onTransactionPurged(final TransactionIdentifier txId) { - final FrontendHistoryMetadataBuilder history = getHistory(txId); - if (history != null) { - history.onTransactionPurged(txId); - LOG.debug("{}: Purged transaction {}", shardName, txId); - } else { - LOG.warn("{}: Unknown history for purged transaction {}, ignoring", shardName, txId); + @Override + LeaderFrontendState toLeaderState(final Shard shard) { + // Note: we have to make sure to *copy* all current state and not leak any views, otherwise leader/follower + // interactions would get intertwined leading to inconsistencies. + final Map histories = new HashMap<>(); + for (FrontendHistoryMetadataBuilder e : currentHistories.values()) { + if (e.getIdentifier().getHistoryId() != 0) { + final AbstractFrontendHistory state = e.toLeaderState(shard); + verify(state instanceof LocalFrontendHistory, "Unexpected state %s", state); + histories.put(e.getIdentifier(), (LocalFrontendHistory) state); + } + } + + final AbstractFrontendHistory singleHistory; + final FrontendHistoryMetadataBuilder singleHistoryMeta = currentHistories.get( + new LocalHistoryIdentifier(getIdentifier(), 0)); + if (singleHistoryMeta == null) { + final ShardDataTree tree = shard.getDataStore(); + singleHistory = StandaloneFrontendHistory.create(shard.persistenceId(), getIdentifier(), tree); + } else { + singleHistory = singleHistoryMeta.toLeaderState(shard); + } + + return new LeaderFrontendState.Enabled(shard.persistenceId(), getIdentifier(), shard.getDataStore(), + purgedHistories.copy(), singleHistory, histories); } - } - /** - * Transform frontend metadata for a particular client into its {@link LeaderFrontendState} counterpart. - * - * @param shard parent shard - * @return Leader frontend state - */ - @NonNull LeaderFrontendState toLeaderState(final @NonNull Shard shard) { - // Note: we have to make sure to *copy* all current state and not leak any views, otherwise leader/follower - // interactions would get intertwined leading to inconsistencies. - final Map histories = new HashMap<>(); - for (FrontendHistoryMetadataBuilder e : currentHistories.values()) { - if (e.getIdentifier().getHistoryId() != 0) { - final AbstractFrontendHistory state = e.toLeaderState(shard); - verify(state instanceof LocalFrontendHistory, "Unexpected state %s", state); - histories.put(e.getIdentifier(), (LocalFrontendHistory) state); + @Override + ToStringHelper addToStringAttributes(final ToStringHelper helper) { + return super.addToStringAttributes(helper).add("current", currentHistories).add("purged", purgedHistories); + } + + private FrontendHistoryMetadataBuilder getHistory(final TransactionIdentifier txId) { + LocalHistoryIdentifier historyId = txId.getHistoryId(); + if (historyId.getHistoryId() == 0 && historyId.getCookie() != 0) { + // We are pre-creating the history for free-standing transactions with a zero cookie, hence our lookup + // needs to account for that. + LOG.debug("{}: looking up {} instead of {}", shardName(), standaloneId, historyId); + historyId = standaloneId; } + + return currentHistories.get(historyId); } - final AbstractFrontendHistory singleHistory; - final FrontendHistoryMetadataBuilder singleHistoryMeta = currentHistories.get( - new LocalHistoryIdentifier(identifier, 0)); - if (singleHistoryMeta == null) { - final ShardDataTree tree = shard.getDataStore(); - singleHistory = StandaloneFrontendHistory.create(shard.persistenceId(), getIdentifier(), tree); - } else { - singleHistory = singleHistoryMeta.toLeaderState(shard); + private LocalHistoryIdentifier standaloneHistoryId() { + return new LocalHistoryIdentifier(getIdentifier(), 0); } + } - return new LeaderFrontendState(shard.persistenceId(), getIdentifier(), shard.getDataStore(), - purgedHistories.copy(), singleHistory, histories); + private static final Logger LOG = LoggerFactory.getLogger(FrontendClientMetadataBuilder.class); + + private final ClientIdentifier identifier; + private final String shardName; + + FrontendClientMetadataBuilder(final String shardName, final ClientIdentifier identifier) { + this.shardName = requireNonNull(shardName); + this.identifier = requireNonNull(identifier); } - private FrontendHistoryMetadataBuilder getHistory(final TransactionIdentifier txId) { - LocalHistoryIdentifier historyId = txId.getHistoryId(); - if (historyId.getHistoryId() == 0 && historyId.getCookie() != 0) { - // We are pre-creating the history for free-standing transactions with a zero cookie, hence our lookup - // needs to account for that. - LOG.debug("{}: looking up {} instead of {}", shardName, standaloneId, historyId); - historyId = standaloneId; - } + static FrontendClientMetadataBuilder of(final String shardName, final FrontendClientMetadata meta) { + final Collection current = meta.getCurrentHistories(); + final RangeSet purged = meta.getPurgedHistories(); + + // Completely empty histories imply disabled state, as otherwise we'd have a record of the single history -- + // either purged or active + return current.isEmpty() && purged.isEmpty() ? new Disabled(shardName, meta.getIdentifier()) + : new Enabled(shardName, meta); + } + + @Override + public final ClientIdentifier getIdentifier() { + return identifier; + } - return currentHistories.get(historyId); + final String shardName() { + return shardName; } + abstract void onHistoryCreated(LocalHistoryIdentifier historyId); + + abstract void onHistoryClosed(LocalHistoryIdentifier historyId); + + abstract void onHistoryPurged(LocalHistoryIdentifier historyId); + + abstract void onTransactionAborted(TransactionIdentifier txId); + + abstract void onTransactionCommitted(TransactionIdentifier txId); + + abstract void onTransactionPurged(TransactionIdentifier txId); + + /** + * Transform frontend metadata for a particular client into its {@link LeaderFrontendState} counterpart. + * + * @param shard parent shard + * @return Leader frontend state + */ + abstract @NonNull LeaderFrontendState toLeaderState(@NonNull Shard shard); + @Override - public String toString() { - return MoreObjects.toStringHelper(this).add("identifier", identifier).add("current", currentHistories) - .add("purged", purgedHistories).toString(); + public final String toString() { + return addToStringAttributes(MoreObjects.toStringHelper(this)).toString(); + } + + ToStringHelper addToStringAttributes(final ToStringHelper helper) { + return helper.add("identifier", identifier); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendMetadata.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendMetadata.java index 969accd583..f651efa3fa 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendMetadata.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendMetadata.java @@ -7,6 +7,8 @@ */ package org.opendaylight.controller.cluster.datastore; +import static com.google.common.base.Verify.verify; + import com.google.common.base.Preconditions; import com.google.common.collect.Collections2; import com.google.common.collect.Maps; @@ -57,7 +59,7 @@ final class FrontendMetadata extends ShardDataTreeMetadata toLeaderState(final @NonNull Shard shard) { return new HashMap<>(Maps.transformValues(clients, meta -> meta.toLeaderState(shard))); } + + void disableTracking(final ClientIdentifier clientId) { + final FrontendIdentifier frontendId = clientId.getFrontendId(); + final FrontendClientMetadataBuilder client = clients.get(frontendId); + if (client == null) { + LOG.debug("{}: disableTracking {} does not match any client, ignoring", shardName, clientId); + return; + } + if (!clientId.equals(client.getIdentifier())) { + LOG.debug("{}: disableTracking {} does not match client {}, ignoring", shardName, clientId, client); + return; + } + if (client instanceof FrontendClientMetadataBuilder.Disabled) { + LOG.debug("{}: client {} is has already disabled tracking", shardName, client); + return; + } + + verify(clients.replace(frontendId, client, new FrontendClientMetadataBuilder.Disabled(shardName, clientId))); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java index 2a1537bd28..295dbdc005 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java @@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.datastore; import static java.util.Objects.requireNonNull; import com.google.common.base.MoreObjects; +import com.google.common.base.MoreObjects.ToStringHelper; import java.util.HashMap; import java.util.Iterator; import java.util.Map; @@ -41,25 +42,206 @@ import org.slf4j.LoggerFactory; * * @author Robert Varga */ -final class LeaderFrontendState implements Identifiable { - private static final Logger LOG = LoggerFactory.getLogger(LeaderFrontendState.class); +abstract class LeaderFrontendState implements Identifiable { + static final class Disabled extends LeaderFrontendState { + Disabled(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree) { + super(persistenceId, clientId, tree); + } + + @Override + LocalHistorySuccess handleLocalHistoryRequest(final LocalHistoryRequest request, + final RequestEnvelope envelope, final long now) throws RequestException { + throw new UnsupportedRequestException(request); + } + + @Override + TransactionSuccess handleTransactionRequest(final TransactionRequest request, + final RequestEnvelope envelope, final long now) throws RequestException { + throw new UnsupportedRequestException(request); + } + } + + static final class Enabled extends LeaderFrontendState { + // Histories which have not been purged + private final Map localHistories; + + // RangeSet performs automatic merging, hence we keep minimal state tracking information + private final UnsignedLongRangeSet purgedHistories; + + // Used for all standalone transactions + private final AbstractFrontendHistory standaloneHistory; + + private long expectedTxSequence; + private Long lastSeenHistory = null; + + Enabled(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree) { + this(persistenceId, clientId, tree, UnsignedLongRangeSet.create(), + StandaloneFrontendHistory.create(persistenceId, clientId, tree), new HashMap<>()); + } + + Enabled(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree, + final UnsignedLongRangeSet purgedHistories, final AbstractFrontendHistory standaloneHistory, + final Map localHistories) { + super(persistenceId, clientId, tree); + this.purgedHistories = requireNonNull(purgedHistories); + this.standaloneHistory = requireNonNull(standaloneHistory); + this.localHistories = requireNonNull(localHistories); + } + + @Override + @Nullable LocalHistorySuccess handleLocalHistoryRequest(final LocalHistoryRequest request, + final RequestEnvelope envelope, final long now) throws RequestException { + checkRequestSequence(envelope); + + try { + if (request instanceof CreateLocalHistoryRequest) { + return handleCreateHistory((CreateLocalHistoryRequest) request, envelope, now); + } else if (request instanceof DestroyLocalHistoryRequest) { + return handleDestroyHistory((DestroyLocalHistoryRequest) request, envelope, now); + } else if (request instanceof PurgeLocalHistoryRequest) { + return handlePurgeHistory((PurgeLocalHistoryRequest)request, envelope, now); + } else { + LOG.warn("{}: rejecting unsupported request {}", persistenceId(), request); + throw new UnsupportedRequestException(request); + } + } finally { + expectNextRequest(); + } + } + + @Override + @Nullable TransactionSuccess handleTransactionRequest(final TransactionRequest request, + final RequestEnvelope envelope, final long now) throws RequestException { + checkRequestSequence(envelope); + + try { + final LocalHistoryIdentifier lhId = request.getTarget().getHistoryId(); + final AbstractFrontendHistory history; + + if (lhId.getHistoryId() != 0) { + history = localHistories.get(lhId); + if (history == null) { + if (purgedHistories.contains(lhId.getHistoryId())) { + LOG.warn("{}: rejecting request {} to purged history", persistenceId(), request); + throw new DeadHistoryException(purgedHistories.toImmutable()); + } + + LOG.warn("{}: rejecting unknown history request {}", persistenceId(), request); + throw new UnknownHistoryException(lastSeenHistory); + } + } else { + history = standaloneHistory; + } + + return history.handleTransactionRequest(request, envelope, now); + } finally { + expectNextRequest(); + } + } + + @Override + void reconnect() { + expectedTxSequence = 0; + super.reconnect(); + } + + @Override + void retire() { + super.retire(); + + // Clear out all transaction chains + localHistories.values().forEach(AbstractFrontendHistory::retire); + localHistories.clear(); + standaloneHistory.retire(); + } + + @Override + ToStringHelper addToStringAttributes(final ToStringHelper helper) { + return super.addToStringAttributes(helper).add("purgedHistories", purgedHistories); + } + + private LocalHistorySuccess handleCreateHistory(final CreateLocalHistoryRequest request, + final RequestEnvelope envelope, final long now) throws RequestException { + final LocalHistoryIdentifier historyId = request.getTarget(); + final AbstractFrontendHistory existing = localHistories.get(historyId); + if (existing != null) { + // History already exists: report success + LOG.debug("{}: history {} already exists", persistenceId(), historyId); + return new LocalHistorySuccess(historyId, request.getSequence()); + } + + // We have not found the history. Before we create it we need to check history ID sequencing so that we do + // not end up resurrecting a purged history. + if (purgedHistories.contains(historyId.getHistoryId())) { + LOG.debug("{}: rejecting purged request {}", persistenceId(), request); + throw new DeadHistoryException(purgedHistories.toImmutable()); + } + + // Update last history we have seen + if (lastSeenHistory == null || Long.compareUnsigned(lastSeenHistory, historyId.getHistoryId()) < 0) { + lastSeenHistory = historyId.getHistoryId(); + } + + // We have to send the response only after persistence has completed + final ShardDataTreeTransactionChain chain = tree().ensureTransactionChain(historyId, () -> { + LOG.debug("{}: persisted history {}", persistenceId(), historyId); + envelope.sendSuccess(new LocalHistorySuccess(historyId, request.getSequence()), + tree().readTime() - now); + }); + + localHistories.put(historyId, LocalFrontendHistory.create(persistenceId(), tree(), chain)); + LOG.debug("{}: created history {}", persistenceId(), historyId); + return null; + } + + private LocalHistorySuccess handleDestroyHistory(final DestroyLocalHistoryRequest request, + final RequestEnvelope envelope, final long now) { + final LocalHistoryIdentifier id = request.getTarget(); + final LocalFrontendHistory existing = localHistories.get(id); + if (existing == null) { + // History does not exist: report success + LOG.debug("{}: history {} does not exist, nothing to destroy", persistenceId(), id); + return new LocalHistorySuccess(id, request.getSequence()); + } + + existing.destroy(request.getSequence(), envelope, now); + return null; + } + + private LocalHistorySuccess handlePurgeHistory(final PurgeLocalHistoryRequest request, + final RequestEnvelope envelope, final long now) { + final LocalHistoryIdentifier id = request.getTarget(); + final LocalFrontendHistory existing = localHistories.remove(id); + if (existing == null) { + LOG.debug("{}: history {} has already been purged", persistenceId(), id); + return new LocalHistorySuccess(id, request.getSequence()); + } + + LOG.debug("{}: purging history {}", persistenceId(), id); + purgedHistories.add(id.getHistoryId()); + existing.purge(request.getSequence(), envelope, now); + return null; + } + + private void checkRequestSequence(final RequestEnvelope envelope) throws OutOfSequenceEnvelopeException { + if (expectedTxSequence != envelope.getTxSequence()) { + throw new OutOfSequenceEnvelopeException(expectedTxSequence); + } + } - // Histories which have not been purged - private final Map localHistories; + private void expectNextRequest() { + expectedTxSequence++; + } + } - // RangeSet performs automatic merging, hence we keep minimal state tracking information - private final UnsignedLongRangeSet purgedHistories; + private static final Logger LOG = LoggerFactory.getLogger(LeaderFrontendState.class); - // Used for all standalone transactions - private final AbstractFrontendHistory standaloneHistory; private final ShardDataTree tree; private final ClientIdentifier clientId; private final String persistenceId; private long lastConnectTicks; private long lastSeenTicks; - private long expectedTxSequence; - private Long lastSeenHistory = null; // TODO: explicit failover notification // Record the ActorRef for the originating actor and when we switch to being a leader send a notification @@ -71,150 +253,44 @@ final class LeaderFrontendState implements Identifiable { // - per-RequestException throw counters LeaderFrontendState(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree) { - this(persistenceId, clientId, tree, UnsignedLongRangeSet.create(), - StandaloneFrontendHistory.create(persistenceId, clientId, tree), new HashMap<>()); - } - - LeaderFrontendState(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree, - final UnsignedLongRangeSet purgedHistories, final AbstractFrontendHistory standaloneHistory, - final Map localHistories) { this.persistenceId = requireNonNull(persistenceId); this.clientId = requireNonNull(clientId); this.tree = requireNonNull(tree); - this.purgedHistories = requireNonNull(purgedHistories); - this.standaloneHistory = requireNonNull(standaloneHistory); - this.localHistories = requireNonNull(localHistories); this.lastSeenTicks = tree.readTime(); } @Override - public ClientIdentifier getIdentifier() { + public final ClientIdentifier getIdentifier() { return clientId; } - private void checkRequestSequence(final RequestEnvelope envelope) throws OutOfSequenceEnvelopeException { - if (expectedTxSequence != envelope.getTxSequence()) { - throw new OutOfSequenceEnvelopeException(expectedTxSequence); - } - } - - private void expectNextRequest() { - expectedTxSequence++; + final String persistenceId() { + return persistenceId; } - @Nullable LocalHistorySuccess handleLocalHistoryRequest(final LocalHistoryRequest request, - final RequestEnvelope envelope, final long now) throws RequestException { - checkRequestSequence(envelope); - - try { - if (request instanceof CreateLocalHistoryRequest) { - return handleCreateHistory((CreateLocalHistoryRequest) request, envelope, now); - } else if (request instanceof DestroyLocalHistoryRequest) { - return handleDestroyHistory((DestroyLocalHistoryRequest) request, envelope, now); - } else if (request instanceof PurgeLocalHistoryRequest) { - return handlePurgeHistory((PurgeLocalHistoryRequest)request, envelope, now); - } else { - LOG.warn("{}: rejecting unsupported request {}", persistenceId, request); - throw new UnsupportedRequestException(request); - } - } finally { - expectNextRequest(); - } + final long getLastConnectTicks() { + return lastConnectTicks; } - private LocalHistorySuccess handleCreateHistory(final CreateLocalHistoryRequest request, - final RequestEnvelope envelope, final long now) throws RequestException { - final LocalHistoryIdentifier historyId = request.getTarget(); - final AbstractFrontendHistory existing = localHistories.get(historyId); - if (existing != null) { - // History already exists: report success - LOG.debug("{}: history {} already exists", persistenceId, historyId); - return new LocalHistorySuccess(historyId, request.getSequence()); - } - - // We have not found the history. Before we create it we need to check history ID sequencing so that we do not - // end up resurrecting a purged history. - if (purgedHistories.contains(historyId.getHistoryId())) { - LOG.debug("{}: rejecting purged request {}", persistenceId, request); - throw new DeadHistoryException(purgedHistories.toImmutable()); - } - - // Update last history we have seen - if (lastSeenHistory == null || Long.compareUnsigned(lastSeenHistory, historyId.getHistoryId()) < 0) { - lastSeenHistory = historyId.getHistoryId(); - } - - // We have to send the response only after persistence has completed - final ShardDataTreeTransactionChain chain = tree.ensureTransactionChain(historyId, () -> { - LOG.debug("{}: persisted history {}", persistenceId, historyId); - envelope.sendSuccess(new LocalHistorySuccess(historyId, request.getSequence()), tree.readTime() - now); - }); - - localHistories.put(historyId, LocalFrontendHistory.create(persistenceId, tree, chain)); - LOG.debug("{}: created history {}", persistenceId, historyId); - return null; + final long getLastSeenTicks() { + return lastSeenTicks; } - private LocalHistorySuccess handleDestroyHistory(final DestroyLocalHistoryRequest request, - final RequestEnvelope envelope, final long now) { - final LocalHistoryIdentifier id = request.getTarget(); - final LocalFrontendHistory existing = localHistories.get(id); - if (existing == null) { - // History does not exist: report success - LOG.debug("{}: history {} does not exist, nothing to destroy", persistenceId, id); - return new LocalHistorySuccess(id, request.getSequence()); - } - - existing.destroy(request.getSequence(), envelope, now); - return null; + final ShardDataTree tree() { + return tree; } - private LocalHistorySuccess handlePurgeHistory(final PurgeLocalHistoryRequest request, - final RequestEnvelope envelope, final long now) { - final LocalHistoryIdentifier id = request.getTarget(); - final LocalFrontendHistory existing = localHistories.remove(id); - if (existing == null) { - LOG.debug("{}: history {} has already been purged", persistenceId, id); - return new LocalHistorySuccess(id, request.getSequence()); - } - - LOG.debug("{}: purging history {}", persistenceId, id); - purgedHistories.add(id.getHistoryId()); - existing.purge(request.getSequence(), envelope, now); - return null; + final void touch() { + this.lastSeenTicks = tree.readTime(); } - @Nullable TransactionSuccess handleTransactionRequest(final TransactionRequest request, - final RequestEnvelope envelope, final long now) throws RequestException { - checkRequestSequence(envelope); - - try { - final LocalHistoryIdentifier lhId = request.getTarget().getHistoryId(); - final AbstractFrontendHistory history; - - if (lhId.getHistoryId() != 0) { - history = localHistories.get(lhId); - if (history == null) { - if (purgedHistories.contains(lhId.getHistoryId())) { - LOG.warn("{}: rejecting request {} to purged history", persistenceId, request); - throw new DeadHistoryException(purgedHistories.toImmutable()); - } - - LOG.warn("{}: rejecting unknown history request {}", persistenceId, request); - throw new UnknownHistoryException(lastSeenHistory); - } - } else { - history = standaloneHistory; - } + abstract @Nullable LocalHistorySuccess handleLocalHistoryRequest(LocalHistoryRequest request, + RequestEnvelope envelope, long now) throws RequestException; - return history.handleTransactionRequest(request, envelope, now); - } finally { - expectNextRequest(); - } - } + abstract @Nullable TransactionSuccess handleTransactionRequest(TransactionRequest request, + RequestEnvelope envelope, long now) throws RequestException; void reconnect() { - expectedTxSequence = 0; lastConnectTicks = tree.readTime(); } @@ -233,31 +309,14 @@ final class LeaderFrontendState implements Identifiable { } } } - - // Clear out all transaction chains - localHistories.values().forEach(AbstractFrontendHistory::retire); - localHistories.clear(); - standaloneHistory.retire(); - } - - long getLastConnectTicks() { - return lastConnectTicks; } - long getLastSeenTicks() { - return lastSeenTicks; - } - - void touch() { - this.lastSeenTicks = tree.readTime(); + @Override + public final String toString() { + return addToStringAttributes(MoreObjects.toStringHelper(this)).toString(); } - @Override - public String toString() { - return MoreObjects.toStringHelper(LeaderFrontendState.class) - .add("clientId", clientId) - .add("nanosAgo", tree.readTime() - lastSeenTicks) - .add("purgedHistories", purgedHistories) - .toString(); + ToStringHelper addToStringAttributes(final ToStringHelper helper) { + return helper.add("clientId", clientId).add("nanosAgo", tree.readTime() - lastSeenTicks); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 97b4a48f10..fb6b0142fe 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -7,6 +7,8 @@ */ package org.opendaylight.controller.cluster.datastore; +import static com.google.common.base.Verify.verify; + import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.Cancellable; @@ -83,6 +85,7 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload; import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot; +import org.opendaylight.controller.cluster.datastore.persisted.DisableTrackingPayload; import org.opendaylight.controller.cluster.datastore.persisted.PurgeTransactionPayload; import org.opendaylight.controller.cluster.messaging.MessageAssembler; import org.opendaylight.controller.cluster.messaging.MessageSlicer; @@ -436,6 +439,32 @@ public class Shard extends RaftActor { return Optional.of(state.getLastConnectTicks()); } + private void disableTracking(final DisableTrackingPayload payload) { + final ClientIdentifier clientId = payload.getIdentifier(); + LOG.debug("{}: disabling tracking of {}", persistenceId(), clientId); + frontendMetadata.disableTracking(clientId); + + if (isLeader()) { + final FrontendIdentifier frontendId = clientId.getFrontendId(); + final LeaderFrontendState frontend = knownFrontends.get(frontendId); + if (frontend != null) { + if (clientId.equals(frontend.getIdentifier())) { + if (!(frontend instanceof LeaderFrontendState.Disabled)) { + verify(knownFrontends.replace(frontendId, frontend, + new LeaderFrontendState.Disabled(persistenceId(), clientId, store))); + LOG.debug("{}: leader state for {} disabled", persistenceId(), clientId); + } else { + LOG.debug("{}: leader state {} is already disabled", persistenceId(), frontend); + } + } else { + LOG.debug("{}: leader state {} does not match {}", persistenceId(), frontend, clientId); + } + } else { + LOG.debug("{}: leader state for {} not found", persistenceId(), clientId); + } + } + } + private void onMakeLeaderLocal() { LOG.debug("{}: onMakeLeaderLocal received", persistenceId()); if (isLeader()) { @@ -529,7 +558,7 @@ public class Shard extends RaftActor { final ABIVersion selectedVersion = selectVersion(message); final LeaderFrontendState frontend; if (existing == null) { - frontend = new LeaderFrontendState(persistenceId(), clientId, store); + frontend = new LeaderFrontendState.Enabled(persistenceId(), clientId, store); knownFrontends.put(clientId.getFrontendId(), frontend); LOG.debug("{}: created state {} for client {}", persistenceId(), frontend, clientId); } else { @@ -873,6 +902,11 @@ public class Shard extends RaftActor { @Override protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) { if (data instanceof Payload) { + if (data instanceof DisableTrackingPayload) { + disableTracking((DisableTrackingPayload) data); + return; + } + try { store.applyReplicatedPayload(identifier, (Payload)data); } catch (DataValidationFailedException | IOException e) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/DisableTrackingPayload.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/DisableTrackingPayload.java new file mode 100644 index 0000000000..29dd072524 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/DisableTrackingPayload.java @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.persisted; + +import com.google.common.io.ByteArrayDataOutput; +import com.google.common.io.ByteStreams; +import java.io.DataInput; +import java.io.IOException; +import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class DisableTrackingPayload extends AbstractIdentifiablePayload { + private static final class Proxy extends AbstractProxy { + @SuppressWarnings("checkstyle:RedundantModifier") + public Proxy() { + // For Externalizable + } + + Proxy(final byte[] serialized) { + super(serialized); + } + + @Override + protected ClientIdentifier readIdentifier(final DataInput in) throws IOException { + return ClientIdentifier.readFrom(in); + } + + @Override + protected DisableTrackingPayload createObject(final ClientIdentifier identifier, + final byte[] serialized) { + return new DisableTrackingPayload(identifier, serialized); + } + } + + private static final Logger LOG = LoggerFactory.getLogger(DisableTrackingPayload.class); + private static final long serialVersionUID = 1L; + + DisableTrackingPayload(final ClientIdentifier clientId, final byte[] serialized) { + super(clientId, serialized); + } + + public static DisableTrackingPayload create(final ClientIdentifier clientId, + final int initialSerializedBufferCapacity) { + final ByteArrayDataOutput out = ByteStreams.newDataOutput(initialSerializedBufferCapacity); + try { + clientId.writeTo(out); + } catch (IOException e) { + // This should never happen + LOG.error("Failed to serialize {}", clientId, e); + throw new RuntimeException("Failed to serialize " + clientId, e); + } + return new DisableTrackingPayload(clientId, out.toByteArray()); + } + + @Override + protected Proxy externalizableProxy(final byte[] serialized) { + return new Proxy(serialized); + } +} -- 2.36.6