import static java.util.Objects.requireNonNull;
import com.google.common.base.MoreObjects;
+import com.google.common.base.MoreObjects.ToStringHelper;
import com.google.common.collect.Collections2;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableRangeSet;
+import com.google.common.collect.RangeSet;
+import com.google.common.primitives.UnsignedLong;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.eclipse.jdt.annotation.NonNull;
/**
* This class is NOT thread-safe.
*/
-final class FrontendClientMetadataBuilder implements Builder<FrontendClientMetadata>, Identifiable<ClientIdentifier> {
- private static final Logger LOG = LoggerFactory.getLogger(FrontendClientMetadataBuilder.class);
+abstract class FrontendClientMetadataBuilder implements Builder<FrontendClientMetadata>,
+ Identifiable<ClientIdentifier> {
+ static final class Disabled extends FrontendClientMetadataBuilder {
+ Disabled(final String shardName, final ClientIdentifier identifier) {
+ super(shardName, identifier);
+ }
- private final Map<LocalHistoryIdentifier, FrontendHistoryMetadataBuilder> currentHistories = new HashMap<>();
- private final UnsignedLongRangeSet purgedHistories;
- private final LocalHistoryIdentifier standaloneId;
- private final ClientIdentifier identifier;
- private final String shardName;
+ @Override
+ public FrontendClientMetadata build() {
+ return new FrontendClientMetadata(getIdentifier(), ImmutableRangeSet.of(), ImmutableList.of());
+ }
- FrontendClientMetadataBuilder(final String shardName, final ClientIdentifier identifier) {
- this.shardName = requireNonNull(shardName);
- this.identifier = requireNonNull(identifier);
- purgedHistories = UnsignedLongRangeSet.create();
+ @Override
+ void onHistoryCreated(final LocalHistoryIdentifier historyId) {
+ // No-op
+ }
- // History for stand-alone transactions is always present
- standaloneId = standaloneHistoryId();
- currentHistories.put(standaloneId, new FrontendHistoryMetadataBuilder(standaloneId));
- }
+ @Override
+ void onHistoryClosed(final LocalHistoryIdentifier historyId) {
+ // No-op
+ }
- FrontendClientMetadataBuilder(final String shardName, final FrontendClientMetadata meta) {
- this.shardName = requireNonNull(shardName);
- this.identifier = meta.getIdentifier();
- purgedHistories = UnsignedLongRangeSet.create(meta.getPurgedHistories());
+ @Override
+ void onHistoryPurged(final LocalHistoryIdentifier historyId) {
+ // No-op
+ }
- for (FrontendHistoryMetadata h : meta.getCurrentHistories()) {
- final FrontendHistoryMetadataBuilder b = new FrontendHistoryMetadataBuilder(identifier, h);
- currentHistories.put(b.getIdentifier(), b);
+ @Override
+ void onTransactionAborted(final TransactionIdentifier txId) {
+ // No-op
}
- // Sanity check and recovery
- standaloneId = standaloneHistoryId();
- if (!currentHistories.containsKey(standaloneId)) {
- LOG.warn("{}: Client {} recovered histories {} do not contain stand-alone history, attempting recovery",
- shardName, identifier, currentHistories);
- currentHistories.put(standaloneId, new FrontendHistoryMetadataBuilder(standaloneId));
+ @Override
+ void onTransactionCommitted(final TransactionIdentifier txId) {
+ // No-op
}
- }
- private LocalHistoryIdentifier standaloneHistoryId() {
- return new LocalHistoryIdentifier(identifier, 0);
- }
+ @Override
+ void onTransactionPurged(final TransactionIdentifier txId) {
+ // No-op
+ }
- @Override
- public FrontendClientMetadata build() {
- return new FrontendClientMetadata(identifier, purgedHistories.toImmutable(),
- Collections2.transform(currentHistories.values(), FrontendHistoryMetadataBuilder::build));
+ @Override
+ LeaderFrontendState toLeaderState(final Shard shard) {
+ return new LeaderFrontendState.Disabled(shard.persistenceId(), getIdentifier(), shard.getDataStore());
+ }
}
- @Override
- public ClientIdentifier getIdentifier() {
- return identifier;
- }
+ static final class Enabled extends FrontendClientMetadataBuilder {
+
+ private final Map<LocalHistoryIdentifier, FrontendHistoryMetadataBuilder> currentHistories = new HashMap<>();
+ private final UnsignedLongRangeSet purgedHistories;
+ private final LocalHistoryIdentifier standaloneId;
+
+ Enabled(final String shardName, final ClientIdentifier identifier) {
+ super(shardName, identifier);
+
+ purgedHistories = UnsignedLongRangeSet.create();
- void onHistoryCreated(final LocalHistoryIdentifier historyId) {
- final FrontendHistoryMetadataBuilder newMeta = new FrontendHistoryMetadataBuilder(historyId);
- final FrontendHistoryMetadataBuilder oldMeta = currentHistories.putIfAbsent(historyId, newMeta);
- if (oldMeta != null) {
- // This should not be happening, warn about it
- LOG.warn("{}: Reused local history {}", shardName, historyId);
- } else {
- LOG.debug("{}: Created local history {}", shardName, historyId);
+ // History for stand-alone transactions is always present
+ standaloneId = standaloneHistoryId();
+ currentHistories.put(standaloneId, new FrontendHistoryMetadataBuilder(standaloneId));
}
- }
- void onHistoryClosed(final LocalHistoryIdentifier historyId) {
- final FrontendHistoryMetadataBuilder builder = currentHistories.get(historyId);
- if (builder != null) {
- builder.onHistoryClosed();
- LOG.debug("{}: Closed history {}", shardName, historyId);
- } else {
- LOG.warn("{}: Closed unknown history {}, ignoring", shardName, historyId);
+ Enabled(final String shardName, final FrontendClientMetadata meta) {
+ super(shardName, meta.getIdentifier());
+
+ purgedHistories = UnsignedLongRangeSet.create(meta.getPurgedHistories());
+ for (FrontendHistoryMetadata h : meta.getCurrentHistories()) {
+ final FrontendHistoryMetadataBuilder b = new FrontendHistoryMetadataBuilder(getIdentifier(), h);
+ currentHistories.put(b.getIdentifier(), b);
+ }
+
+ // Sanity check and recovery
+ standaloneId = standaloneHistoryId();
+ if (!currentHistories.containsKey(standaloneId)) {
+ LOG.warn("{}: Client {} recovered histories {} do not contain stand-alone history, attempting recovery",
+ shardName, getIdentifier(), currentHistories);
+ currentHistories.put(standaloneId, new FrontendHistoryMetadataBuilder(standaloneId));
+ }
+ }
+
+ @Override
+ public FrontendClientMetadata build() {
+ return new FrontendClientMetadata(getIdentifier(), purgedHistories.toImmutable(),
+ Collections2.transform(currentHistories.values(), FrontendHistoryMetadataBuilder::build));
+ }
+
+ @Override
+ void onHistoryCreated(final LocalHistoryIdentifier historyId) {
+ final FrontendHistoryMetadataBuilder newMeta = new FrontendHistoryMetadataBuilder(historyId);
+ final FrontendHistoryMetadataBuilder oldMeta = currentHistories.putIfAbsent(historyId, newMeta);
+ if (oldMeta != null) {
+ // This should not be happening, warn about it
+ LOG.warn("{}: Reused local history {}", shardName(), historyId);
+ } else {
+ LOG.debug("{}: Created local history {}", shardName(), historyId);
+ }
}
- }
- void onHistoryPurged(final LocalHistoryIdentifier historyId) {
- final FrontendHistoryMetadataBuilder history = currentHistories.remove(historyId);
- final long historyBits = historyId.getHistoryId();
- if (history == null) {
- if (!purgedHistories.contains(historyBits)) {
+ @Override
+ void onHistoryClosed(final LocalHistoryIdentifier historyId) {
+ final FrontendHistoryMetadataBuilder builder = currentHistories.get(historyId);
+ if (builder != null) {
+ builder.onHistoryClosed();
+ LOG.debug("{}: Closed history {}", shardName(), historyId);
+ } else {
+ LOG.warn("{}: Closed unknown history {}, ignoring", shardName(), historyId);
+ }
+ }
+
+ @Override
+ void onHistoryPurged(final LocalHistoryIdentifier historyId) {
+ final FrontendHistoryMetadataBuilder history = currentHistories.remove(historyId);
+ final long historyBits = historyId.getHistoryId();
+ if (history == null) {
+ if (!purgedHistories.contains(historyBits)) {
+ purgedHistories.add(historyBits);
+ LOG.warn("{}: Purging unknown history {}", shardName(), historyId);
+ } else {
+ LOG.warn("{}: Duplicate purge of history {}", shardName(), historyId);
+ }
+ } else {
purgedHistories.add(historyBits);
- LOG.warn("{}: Purging unknown history {}", shardName, historyId);
+ LOG.debug("{}: Purged history {}", shardName(), historyId);
+ }
+ }
+
+ @Override
+ void onTransactionAborted(final TransactionIdentifier txId) {
+ final FrontendHistoryMetadataBuilder history = getHistory(txId);
+ if (history != null) {
+ history.onTransactionAborted(txId);
+ LOG.debug("{}: Aborted transaction {}", shardName(), txId);
} else {
- LOG.warn("{}: Duplicate purge of history {}", shardName, historyId);
+ LOG.warn("{}: Unknown history for aborted transaction {}, ignoring", shardName(), txId);
}
- } else {
- purgedHistories.add(historyBits);
- LOG.debug("{}: Purged history {}", shardName, historyId);
}
- }
- void onTransactionAborted(final TransactionIdentifier txId) {
- final FrontendHistoryMetadataBuilder history = getHistory(txId);
- if (history != null) {
- history.onTransactionAborted(txId);
- LOG.debug("{}: Aborted transaction {}", shardName, txId);
- } else {
- LOG.warn("{}: Unknown history for aborted transaction {}, ignoring", shardName, txId);
+ @Override
+ void onTransactionCommitted(final TransactionIdentifier txId) {
+ final FrontendHistoryMetadataBuilder history = getHistory(txId);
+ if (history != null) {
+ history.onTransactionCommitted(txId);
+ LOG.debug("{}: Committed transaction {}", shardName(), txId);
+ } else {
+ LOG.warn("{}: Unknown history for commited transaction {}, ignoring", shardName(), txId);
+ }
}
- }
- void onTransactionCommitted(final TransactionIdentifier txId) {
- final FrontendHistoryMetadataBuilder history = getHistory(txId);
- if (history != null) {
- history.onTransactionCommitted(txId);
- LOG.debug("{}: Committed transaction {}", shardName, txId);
- } else {
- LOG.warn("{}: Unknown history for commited transaction {}, ignoring", shardName, txId);
+ @Override
+ void onTransactionPurged(final TransactionIdentifier txId) {
+ final FrontendHistoryMetadataBuilder history = getHistory(txId);
+ if (history != null) {
+ history.onTransactionPurged(txId);
+ LOG.debug("{}: Purged transaction {}", shardName(), txId);
+ } else {
+ LOG.warn("{}: Unknown history for purged transaction {}, ignoring", shardName(), txId);
+ }
}
- }
- void onTransactionPurged(final TransactionIdentifier txId) {
- final FrontendHistoryMetadataBuilder history = getHistory(txId);
- if (history != null) {
- history.onTransactionPurged(txId);
- LOG.debug("{}: Purged transaction {}", shardName, txId);
- } else {
- LOG.warn("{}: Unknown history for purged transaction {}, ignoring", shardName, txId);
+ @Override
+ LeaderFrontendState toLeaderState(final Shard shard) {
+ // Note: we have to make sure to *copy* all current state and not leak any views, otherwise leader/follower
+ // interactions would get intertwined leading to inconsistencies.
+ final Map<LocalHistoryIdentifier, LocalFrontendHistory> histories = new HashMap<>();
+ for (FrontendHistoryMetadataBuilder e : currentHistories.values()) {
+ if (e.getIdentifier().getHistoryId() != 0) {
+ final AbstractFrontendHistory state = e.toLeaderState(shard);
+ verify(state instanceof LocalFrontendHistory, "Unexpected state %s", state);
+ histories.put(e.getIdentifier(), (LocalFrontendHistory) state);
+ }
+ }
+
+ final AbstractFrontendHistory singleHistory;
+ final FrontendHistoryMetadataBuilder singleHistoryMeta = currentHistories.get(
+ new LocalHistoryIdentifier(getIdentifier(), 0));
+ if (singleHistoryMeta == null) {
+ final ShardDataTree tree = shard.getDataStore();
+ singleHistory = StandaloneFrontendHistory.create(shard.persistenceId(), getIdentifier(), tree);
+ } else {
+ singleHistory = singleHistoryMeta.toLeaderState(shard);
+ }
+
+ return new LeaderFrontendState.Enabled(shard.persistenceId(), getIdentifier(), shard.getDataStore(),
+ purgedHistories.copy(), singleHistory, histories);
}
- }
- /**
- * Transform frontend metadata for a particular client into its {@link LeaderFrontendState} counterpart.
- *
- * @param shard parent shard
- * @return Leader frontend state
- */
- @NonNull LeaderFrontendState toLeaderState(final @NonNull Shard shard) {
- // Note: we have to make sure to *copy* all current state and not leak any views, otherwise leader/follower
- // interactions would get intertwined leading to inconsistencies.
- final Map<LocalHistoryIdentifier, LocalFrontendHistory> histories = new HashMap<>();
- for (FrontendHistoryMetadataBuilder e : currentHistories.values()) {
- if (e.getIdentifier().getHistoryId() != 0) {
- final AbstractFrontendHistory state = e.toLeaderState(shard);
- verify(state instanceof LocalFrontendHistory, "Unexpected state %s", state);
- histories.put(e.getIdentifier(), (LocalFrontendHistory) state);
+ @Override
+ ToStringHelper addToStringAttributes(final ToStringHelper helper) {
+ return super.addToStringAttributes(helper).add("current", currentHistories).add("purged", purgedHistories);
+ }
+
+ private FrontendHistoryMetadataBuilder getHistory(final TransactionIdentifier txId) {
+ LocalHistoryIdentifier historyId = txId.getHistoryId();
+ if (historyId.getHistoryId() == 0 && historyId.getCookie() != 0) {
+ // We are pre-creating the history for free-standing transactions with a zero cookie, hence our lookup
+ // needs to account for that.
+ LOG.debug("{}: looking up {} instead of {}", shardName(), standaloneId, historyId);
+ historyId = standaloneId;
}
+
+ return currentHistories.get(historyId);
}
- final AbstractFrontendHistory singleHistory;
- final FrontendHistoryMetadataBuilder singleHistoryMeta = currentHistories.get(
- new LocalHistoryIdentifier(identifier, 0));
- if (singleHistoryMeta == null) {
- final ShardDataTree tree = shard.getDataStore();
- singleHistory = StandaloneFrontendHistory.create(shard.persistenceId(), getIdentifier(), tree);
- } else {
- singleHistory = singleHistoryMeta.toLeaderState(shard);
+ private LocalHistoryIdentifier standaloneHistoryId() {
+ return new LocalHistoryIdentifier(getIdentifier(), 0);
}
+ }
- return new LeaderFrontendState(shard.persistenceId(), getIdentifier(), shard.getDataStore(),
- purgedHistories.copy(), singleHistory, histories);
+ private static final Logger LOG = LoggerFactory.getLogger(FrontendClientMetadataBuilder.class);
+
+ private final ClientIdentifier identifier;
+ private final String shardName;
+
+ FrontendClientMetadataBuilder(final String shardName, final ClientIdentifier identifier) {
+ this.shardName = requireNonNull(shardName);
+ this.identifier = requireNonNull(identifier);
}
- private FrontendHistoryMetadataBuilder getHistory(final TransactionIdentifier txId) {
- LocalHistoryIdentifier historyId = txId.getHistoryId();
- if (historyId.getHistoryId() == 0 && historyId.getCookie() != 0) {
- // We are pre-creating the history for free-standing transactions with a zero cookie, hence our lookup
- // needs to account for that.
- LOG.debug("{}: looking up {} instead of {}", shardName, standaloneId, historyId);
- historyId = standaloneId;
- }
+ static FrontendClientMetadataBuilder of(final String shardName, final FrontendClientMetadata meta) {
+ final Collection<FrontendHistoryMetadata> current = meta.getCurrentHistories();
+ final RangeSet<UnsignedLong> purged = meta.getPurgedHistories();
+
+ // Completely empty histories imply disabled state, as otherwise we'd have a record of the single history --
+ // either purged or active
+ return current.isEmpty() && purged.isEmpty() ? new Disabled(shardName, meta.getIdentifier())
+ : new Enabled(shardName, meta);
+ }
+
+ @Override
+ public final ClientIdentifier getIdentifier() {
+ return identifier;
+ }
- return currentHistories.get(historyId);
+ final String shardName() {
+ return shardName;
}
+ abstract void onHistoryCreated(LocalHistoryIdentifier historyId);
+
+ abstract void onHistoryClosed(LocalHistoryIdentifier historyId);
+
+ abstract void onHistoryPurged(LocalHistoryIdentifier historyId);
+
+ abstract void onTransactionAborted(TransactionIdentifier txId);
+
+ abstract void onTransactionCommitted(TransactionIdentifier txId);
+
+ abstract void onTransactionPurged(TransactionIdentifier txId);
+
+ /**
+ * Transform frontend metadata for a particular client into its {@link LeaderFrontendState} counterpart.
+ *
+ * @param shard parent shard
+ * @return Leader frontend state
+ */
+ abstract @NonNull LeaderFrontendState toLeaderState(@NonNull Shard shard);
+
@Override
- public String toString() {
- return MoreObjects.toStringHelper(this).add("identifier", identifier).add("current", currentHistories)
- .add("purged", purgedHistories).toString();
+ public final String toString() {
+ return addToStringAttributes(MoreObjects.toStringHelper(this)).toString();
+ }
+
+ ToStringHelper addToStringAttributes(final ToStringHelper helper) {
+ return helper.add("identifier", identifier);
}
}
*/
package org.opendaylight.controller.cluster.datastore;
+import static com.google.common.base.Verify.verify;
+
import com.google.common.base.Preconditions;
import com.google.common.collect.Collections2;
import com.google.common.collect.Maps;
for (FrontendClientMetadata m : snapshot.getClients()) {
LOG.debug("{}: applying metadata {}", shardName, m);
- final FrontendClientMetadataBuilder b = new FrontendClientMetadataBuilder(shardName, m);
+ final FrontendClientMetadataBuilder b = FrontendClientMetadataBuilder.of(shardName, m);
final FrontendIdentifier client = m.getIdentifier().getFrontendId();
LOG.debug("{}: client {} updated to {}", shardName, client, b);
return existing;
}
- final FrontendClientMetadataBuilder client = new FrontendClientMetadataBuilder(shardName, id);
+ final FrontendClientMetadataBuilder client = new FrontendClientMetadataBuilder.Enabled(shardName, id);
final FrontendClientMetadataBuilder previous = clients.put(id.getFrontendId(), client);
if (previous != null) {
LOG.debug("{}: Replaced client {} with {}", shardName, previous, client);
@NonNull Map<FrontendIdentifier, LeaderFrontendState> toLeaderState(final @NonNull Shard shard) {
return new HashMap<>(Maps.transformValues(clients, meta -> meta.toLeaderState(shard)));
}
+
+ void disableTracking(final ClientIdentifier clientId) {
+ final FrontendIdentifier frontendId = clientId.getFrontendId();
+ final FrontendClientMetadataBuilder client = clients.get(frontendId);
+ if (client == null) {
+ LOG.debug("{}: disableTracking {} does not match any client, ignoring", shardName, clientId);
+ return;
+ }
+ if (!clientId.equals(client.getIdentifier())) {
+ LOG.debug("{}: disableTracking {} does not match client {}, ignoring", shardName, clientId, client);
+ return;
+ }
+ if (client instanceof FrontendClientMetadataBuilder.Disabled) {
+ LOG.debug("{}: client {} is has already disabled tracking", shardName, client);
+ return;
+ }
+
+ verify(clients.replace(frontendId, client, new FrontendClientMetadataBuilder.Disabled(shardName, clientId)));
+ }
}
import static java.util.Objects.requireNonNull;
import com.google.common.base.MoreObjects;
+import com.google.common.base.MoreObjects.ToStringHelper;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
*
* @author Robert Varga
*/
-final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
- private static final Logger LOG = LoggerFactory.getLogger(LeaderFrontendState.class);
+abstract class LeaderFrontendState implements Identifiable<ClientIdentifier> {
+ static final class Disabled extends LeaderFrontendState {
+ Disabled(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree) {
+ super(persistenceId, clientId, tree);
+ }
+
+ @Override
+ LocalHistorySuccess handleLocalHistoryRequest(final LocalHistoryRequest<?> request,
+ final RequestEnvelope envelope, final long now) throws RequestException {
+ throw new UnsupportedRequestException(request);
+ }
+
+ @Override
+ TransactionSuccess<?> handleTransactionRequest(final TransactionRequest<?> request,
+ final RequestEnvelope envelope, final long now) throws RequestException {
+ throw new UnsupportedRequestException(request);
+ }
+ }
+
+ static final class Enabled extends LeaderFrontendState {
+ // Histories which have not been purged
+ private final Map<LocalHistoryIdentifier, LocalFrontendHistory> localHistories;
+
+ // RangeSet performs automatic merging, hence we keep minimal state tracking information
+ private final UnsignedLongRangeSet purgedHistories;
+
+ // Used for all standalone transactions
+ private final AbstractFrontendHistory standaloneHistory;
+
+ private long expectedTxSequence;
+ private Long lastSeenHistory = null;
+
+ Enabled(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree) {
+ this(persistenceId, clientId, tree, UnsignedLongRangeSet.create(),
+ StandaloneFrontendHistory.create(persistenceId, clientId, tree), new HashMap<>());
+ }
+
+ Enabled(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree,
+ final UnsignedLongRangeSet purgedHistories, final AbstractFrontendHistory standaloneHistory,
+ final Map<LocalHistoryIdentifier, LocalFrontendHistory> localHistories) {
+ super(persistenceId, clientId, tree);
+ this.purgedHistories = requireNonNull(purgedHistories);
+ this.standaloneHistory = requireNonNull(standaloneHistory);
+ this.localHistories = requireNonNull(localHistories);
+ }
+
+ @Override
+ @Nullable LocalHistorySuccess handleLocalHistoryRequest(final LocalHistoryRequest<?> request,
+ final RequestEnvelope envelope, final long now) throws RequestException {
+ checkRequestSequence(envelope);
+
+ try {
+ if (request instanceof CreateLocalHistoryRequest) {
+ return handleCreateHistory((CreateLocalHistoryRequest) request, envelope, now);
+ } else if (request instanceof DestroyLocalHistoryRequest) {
+ return handleDestroyHistory((DestroyLocalHistoryRequest) request, envelope, now);
+ } else if (request instanceof PurgeLocalHistoryRequest) {
+ return handlePurgeHistory((PurgeLocalHistoryRequest)request, envelope, now);
+ } else {
+ LOG.warn("{}: rejecting unsupported request {}", persistenceId(), request);
+ throw new UnsupportedRequestException(request);
+ }
+ } finally {
+ expectNextRequest();
+ }
+ }
+
+ @Override
+ @Nullable TransactionSuccess<?> handleTransactionRequest(final TransactionRequest<?> request,
+ final RequestEnvelope envelope, final long now) throws RequestException {
+ checkRequestSequence(envelope);
+
+ try {
+ final LocalHistoryIdentifier lhId = request.getTarget().getHistoryId();
+ final AbstractFrontendHistory history;
+
+ if (lhId.getHistoryId() != 0) {
+ history = localHistories.get(lhId);
+ if (history == null) {
+ if (purgedHistories.contains(lhId.getHistoryId())) {
+ LOG.warn("{}: rejecting request {} to purged history", persistenceId(), request);
+ throw new DeadHistoryException(purgedHistories.toImmutable());
+ }
+
+ LOG.warn("{}: rejecting unknown history request {}", persistenceId(), request);
+ throw new UnknownHistoryException(lastSeenHistory);
+ }
+ } else {
+ history = standaloneHistory;
+ }
+
+ return history.handleTransactionRequest(request, envelope, now);
+ } finally {
+ expectNextRequest();
+ }
+ }
+
+ @Override
+ void reconnect() {
+ expectedTxSequence = 0;
+ super.reconnect();
+ }
+
+ @Override
+ void retire() {
+ super.retire();
+
+ // Clear out all transaction chains
+ localHistories.values().forEach(AbstractFrontendHistory::retire);
+ localHistories.clear();
+ standaloneHistory.retire();
+ }
+
+ @Override
+ ToStringHelper addToStringAttributes(final ToStringHelper helper) {
+ return super.addToStringAttributes(helper).add("purgedHistories", purgedHistories);
+ }
+
+ private LocalHistorySuccess handleCreateHistory(final CreateLocalHistoryRequest request,
+ final RequestEnvelope envelope, final long now) throws RequestException {
+ final LocalHistoryIdentifier historyId = request.getTarget();
+ final AbstractFrontendHistory existing = localHistories.get(historyId);
+ if (existing != null) {
+ // History already exists: report success
+ LOG.debug("{}: history {} already exists", persistenceId(), historyId);
+ return new LocalHistorySuccess(historyId, request.getSequence());
+ }
+
+ // We have not found the history. Before we create it we need to check history ID sequencing so that we do
+ // not end up resurrecting a purged history.
+ if (purgedHistories.contains(historyId.getHistoryId())) {
+ LOG.debug("{}: rejecting purged request {}", persistenceId(), request);
+ throw new DeadHistoryException(purgedHistories.toImmutable());
+ }
+
+ // Update last history we have seen
+ if (lastSeenHistory == null || Long.compareUnsigned(lastSeenHistory, historyId.getHistoryId()) < 0) {
+ lastSeenHistory = historyId.getHistoryId();
+ }
+
+ // We have to send the response only after persistence has completed
+ final ShardDataTreeTransactionChain chain = tree().ensureTransactionChain(historyId, () -> {
+ LOG.debug("{}: persisted history {}", persistenceId(), historyId);
+ envelope.sendSuccess(new LocalHistorySuccess(historyId, request.getSequence()),
+ tree().readTime() - now);
+ });
+
+ localHistories.put(historyId, LocalFrontendHistory.create(persistenceId(), tree(), chain));
+ LOG.debug("{}: created history {}", persistenceId(), historyId);
+ return null;
+ }
+
+ private LocalHistorySuccess handleDestroyHistory(final DestroyLocalHistoryRequest request,
+ final RequestEnvelope envelope, final long now) {
+ final LocalHistoryIdentifier id = request.getTarget();
+ final LocalFrontendHistory existing = localHistories.get(id);
+ if (existing == null) {
+ // History does not exist: report success
+ LOG.debug("{}: history {} does not exist, nothing to destroy", persistenceId(), id);
+ return new LocalHistorySuccess(id, request.getSequence());
+ }
+
+ existing.destroy(request.getSequence(), envelope, now);
+ return null;
+ }
+
+ private LocalHistorySuccess handlePurgeHistory(final PurgeLocalHistoryRequest request,
+ final RequestEnvelope envelope, final long now) {
+ final LocalHistoryIdentifier id = request.getTarget();
+ final LocalFrontendHistory existing = localHistories.remove(id);
+ if (existing == null) {
+ LOG.debug("{}: history {} has already been purged", persistenceId(), id);
+ return new LocalHistorySuccess(id, request.getSequence());
+ }
+
+ LOG.debug("{}: purging history {}", persistenceId(), id);
+ purgedHistories.add(id.getHistoryId());
+ existing.purge(request.getSequence(), envelope, now);
+ return null;
+ }
+
+ private void checkRequestSequence(final RequestEnvelope envelope) throws OutOfSequenceEnvelopeException {
+ if (expectedTxSequence != envelope.getTxSequence()) {
+ throw new OutOfSequenceEnvelopeException(expectedTxSequence);
+ }
+ }
- // Histories which have not been purged
- private final Map<LocalHistoryIdentifier, LocalFrontendHistory> localHistories;
+ private void expectNextRequest() {
+ expectedTxSequence++;
+ }
+ }
- // RangeSet performs automatic merging, hence we keep minimal state tracking information
- private final UnsignedLongRangeSet purgedHistories;
+ private static final Logger LOG = LoggerFactory.getLogger(LeaderFrontendState.class);
- // Used for all standalone transactions
- private final AbstractFrontendHistory standaloneHistory;
private final ShardDataTree tree;
private final ClientIdentifier clientId;
private final String persistenceId;
private long lastConnectTicks;
private long lastSeenTicks;
- private long expectedTxSequence;
- private Long lastSeenHistory = null;
// TODO: explicit failover notification
// Record the ActorRef for the originating actor and when we switch to being a leader send a notification
// - per-RequestException throw counters
LeaderFrontendState(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree) {
- this(persistenceId, clientId, tree, UnsignedLongRangeSet.create(),
- StandaloneFrontendHistory.create(persistenceId, clientId, tree), new HashMap<>());
- }
-
- LeaderFrontendState(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree,
- final UnsignedLongRangeSet purgedHistories, final AbstractFrontendHistory standaloneHistory,
- final Map<LocalHistoryIdentifier, LocalFrontendHistory> localHistories) {
this.persistenceId = requireNonNull(persistenceId);
this.clientId = requireNonNull(clientId);
this.tree = requireNonNull(tree);
- this.purgedHistories = requireNonNull(purgedHistories);
- this.standaloneHistory = requireNonNull(standaloneHistory);
- this.localHistories = requireNonNull(localHistories);
this.lastSeenTicks = tree.readTime();
}
@Override
- public ClientIdentifier getIdentifier() {
+ public final ClientIdentifier getIdentifier() {
return clientId;
}
- private void checkRequestSequence(final RequestEnvelope envelope) throws OutOfSequenceEnvelopeException {
- if (expectedTxSequence != envelope.getTxSequence()) {
- throw new OutOfSequenceEnvelopeException(expectedTxSequence);
- }
- }
-
- private void expectNextRequest() {
- expectedTxSequence++;
+ final String persistenceId() {
+ return persistenceId;
}
- @Nullable LocalHistorySuccess handleLocalHistoryRequest(final LocalHistoryRequest<?> request,
- final RequestEnvelope envelope, final long now) throws RequestException {
- checkRequestSequence(envelope);
-
- try {
- if (request instanceof CreateLocalHistoryRequest) {
- return handleCreateHistory((CreateLocalHistoryRequest) request, envelope, now);
- } else if (request instanceof DestroyLocalHistoryRequest) {
- return handleDestroyHistory((DestroyLocalHistoryRequest) request, envelope, now);
- } else if (request instanceof PurgeLocalHistoryRequest) {
- return handlePurgeHistory((PurgeLocalHistoryRequest)request, envelope, now);
- } else {
- LOG.warn("{}: rejecting unsupported request {}", persistenceId, request);
- throw new UnsupportedRequestException(request);
- }
- } finally {
- expectNextRequest();
- }
+ final long getLastConnectTicks() {
+ return lastConnectTicks;
}
- private LocalHistorySuccess handleCreateHistory(final CreateLocalHistoryRequest request,
- final RequestEnvelope envelope, final long now) throws RequestException {
- final LocalHistoryIdentifier historyId = request.getTarget();
- final AbstractFrontendHistory existing = localHistories.get(historyId);
- if (existing != null) {
- // History already exists: report success
- LOG.debug("{}: history {} already exists", persistenceId, historyId);
- return new LocalHistorySuccess(historyId, request.getSequence());
- }
-
- // We have not found the history. Before we create it we need to check history ID sequencing so that we do not
- // end up resurrecting a purged history.
- if (purgedHistories.contains(historyId.getHistoryId())) {
- LOG.debug("{}: rejecting purged request {}", persistenceId, request);
- throw new DeadHistoryException(purgedHistories.toImmutable());
- }
-
- // Update last history we have seen
- if (lastSeenHistory == null || Long.compareUnsigned(lastSeenHistory, historyId.getHistoryId()) < 0) {
- lastSeenHistory = historyId.getHistoryId();
- }
-
- // We have to send the response only after persistence has completed
- final ShardDataTreeTransactionChain chain = tree.ensureTransactionChain(historyId, () -> {
- LOG.debug("{}: persisted history {}", persistenceId, historyId);
- envelope.sendSuccess(new LocalHistorySuccess(historyId, request.getSequence()), tree.readTime() - now);
- });
-
- localHistories.put(historyId, LocalFrontendHistory.create(persistenceId, tree, chain));
- LOG.debug("{}: created history {}", persistenceId, historyId);
- return null;
+ final long getLastSeenTicks() {
+ return lastSeenTicks;
}
- private LocalHistorySuccess handleDestroyHistory(final DestroyLocalHistoryRequest request,
- final RequestEnvelope envelope, final long now) {
- final LocalHistoryIdentifier id = request.getTarget();
- final LocalFrontendHistory existing = localHistories.get(id);
- if (existing == null) {
- // History does not exist: report success
- LOG.debug("{}: history {} does not exist, nothing to destroy", persistenceId, id);
- return new LocalHistorySuccess(id, request.getSequence());
- }
-
- existing.destroy(request.getSequence(), envelope, now);
- return null;
+ final ShardDataTree tree() {
+ return tree;
}
- private LocalHistorySuccess handlePurgeHistory(final PurgeLocalHistoryRequest request,
- final RequestEnvelope envelope, final long now) {
- final LocalHistoryIdentifier id = request.getTarget();
- final LocalFrontendHistory existing = localHistories.remove(id);
- if (existing == null) {
- LOG.debug("{}: history {} has already been purged", persistenceId, id);
- return new LocalHistorySuccess(id, request.getSequence());
- }
-
- LOG.debug("{}: purging history {}", persistenceId, id);
- purgedHistories.add(id.getHistoryId());
- existing.purge(request.getSequence(), envelope, now);
- return null;
+ final void touch() {
+ this.lastSeenTicks = tree.readTime();
}
- @Nullable TransactionSuccess<?> handleTransactionRequest(final TransactionRequest<?> request,
- final RequestEnvelope envelope, final long now) throws RequestException {
- checkRequestSequence(envelope);
-
- try {
- final LocalHistoryIdentifier lhId = request.getTarget().getHistoryId();
- final AbstractFrontendHistory history;
-
- if (lhId.getHistoryId() != 0) {
- history = localHistories.get(lhId);
- if (history == null) {
- if (purgedHistories.contains(lhId.getHistoryId())) {
- LOG.warn("{}: rejecting request {} to purged history", persistenceId, request);
- throw new DeadHistoryException(purgedHistories.toImmutable());
- }
-
- LOG.warn("{}: rejecting unknown history request {}", persistenceId, request);
- throw new UnknownHistoryException(lastSeenHistory);
- }
- } else {
- history = standaloneHistory;
- }
+ abstract @Nullable LocalHistorySuccess handleLocalHistoryRequest(LocalHistoryRequest<?> request,
+ RequestEnvelope envelope, long now) throws RequestException;
- return history.handleTransactionRequest(request, envelope, now);
- } finally {
- expectNextRequest();
- }
- }
+ abstract @Nullable TransactionSuccess<?> handleTransactionRequest(TransactionRequest<?> request,
+ RequestEnvelope envelope, long now) throws RequestException;
void reconnect() {
- expectedTxSequence = 0;
lastConnectTicks = tree.readTime();
}
}
}
}
-
- // Clear out all transaction chains
- localHistories.values().forEach(AbstractFrontendHistory::retire);
- localHistories.clear();
- standaloneHistory.retire();
- }
-
- long getLastConnectTicks() {
- return lastConnectTicks;
}
- long getLastSeenTicks() {
- return lastSeenTicks;
- }
-
- void touch() {
- this.lastSeenTicks = tree.readTime();
+ @Override
+ public final String toString() {
+ return addToStringAttributes(MoreObjects.toStringHelper(this)).toString();
}
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(LeaderFrontendState.class)
- .add("clientId", clientId)
- .add("nanosAgo", tree.readTime() - lastSeenTicks)
- .add("purgedHistories", purgedHistories)
- .toString();
+ ToStringHelper addToStringAttributes(final ToStringHelper helper) {
+ return helper.add("clientId", clientId).add("nanosAgo", tree.readTime() - lastSeenTicks);
}
}
*/
package org.opendaylight.controller.cluster.datastore;
+import static com.google.common.base.Verify.verify;
+
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.Cancellable;
import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload;
import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.DisableTrackingPayload;
import org.opendaylight.controller.cluster.datastore.persisted.PurgeTransactionPayload;
import org.opendaylight.controller.cluster.messaging.MessageAssembler;
import org.opendaylight.controller.cluster.messaging.MessageSlicer;
return Optional.of(state.getLastConnectTicks());
}
+ private void disableTracking(final DisableTrackingPayload payload) {
+ final ClientIdentifier clientId = payload.getIdentifier();
+ LOG.debug("{}: disabling tracking of {}", persistenceId(), clientId);
+ frontendMetadata.disableTracking(clientId);
+
+ if (isLeader()) {
+ final FrontendIdentifier frontendId = clientId.getFrontendId();
+ final LeaderFrontendState frontend = knownFrontends.get(frontendId);
+ if (frontend != null) {
+ if (clientId.equals(frontend.getIdentifier())) {
+ if (!(frontend instanceof LeaderFrontendState.Disabled)) {
+ verify(knownFrontends.replace(frontendId, frontend,
+ new LeaderFrontendState.Disabled(persistenceId(), clientId, store)));
+ LOG.debug("{}: leader state for {} disabled", persistenceId(), clientId);
+ } else {
+ LOG.debug("{}: leader state {} is already disabled", persistenceId(), frontend);
+ }
+ } else {
+ LOG.debug("{}: leader state {} does not match {}", persistenceId(), frontend, clientId);
+ }
+ } else {
+ LOG.debug("{}: leader state for {} not found", persistenceId(), clientId);
+ }
+ }
+ }
+
private void onMakeLeaderLocal() {
LOG.debug("{}: onMakeLeaderLocal received", persistenceId());
if (isLeader()) {
final ABIVersion selectedVersion = selectVersion(message);
final LeaderFrontendState frontend;
if (existing == null) {
- frontend = new LeaderFrontendState(persistenceId(), clientId, store);
+ frontend = new LeaderFrontendState.Enabled(persistenceId(), clientId, store);
knownFrontends.put(clientId.getFrontendId(), frontend);
LOG.debug("{}: created state {} for client {}", persistenceId(), frontend, clientId);
} else {
@Override
protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) {
if (data instanceof Payload) {
+ if (data instanceof DisableTrackingPayload) {
+ disableTracking((DisableTrackingPayload) data);
+ return;
+ }
+
try {
store.applyReplicatedPayload(identifier, (Payload)data);
} catch (DataValidationFailedException | IOException e) {
--- /dev/null
+/*
+ * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.persisted;
+
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
+import java.io.DataInput;
+import java.io.IOException;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class DisableTrackingPayload extends AbstractIdentifiablePayload<ClientIdentifier> {
+ private static final class Proxy extends AbstractProxy<ClientIdentifier> {
+ @SuppressWarnings("checkstyle:RedundantModifier")
+ public Proxy() {
+ // For Externalizable
+ }
+
+ Proxy(final byte[] serialized) {
+ super(serialized);
+ }
+
+ @Override
+ protected ClientIdentifier readIdentifier(final DataInput in) throws IOException {
+ return ClientIdentifier.readFrom(in);
+ }
+
+ @Override
+ protected DisableTrackingPayload createObject(final ClientIdentifier identifier,
+ final byte[] serialized) {
+ return new DisableTrackingPayload(identifier, serialized);
+ }
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(DisableTrackingPayload.class);
+ private static final long serialVersionUID = 1L;
+
+ DisableTrackingPayload(final ClientIdentifier clientId, final byte[] serialized) {
+ super(clientId, serialized);
+ }
+
+ public static DisableTrackingPayload create(final ClientIdentifier clientId,
+ final int initialSerializedBufferCapacity) {
+ final ByteArrayDataOutput out = ByteStreams.newDataOutput(initialSerializedBufferCapacity);
+ try {
+ clientId.writeTo(out);
+ } catch (IOException e) {
+ // This should never happen
+ LOG.error("Failed to serialize {}", clientId, e);
+ throw new RuntimeException("Failed to serialize " + clientId, e);
+ }
+ return new DisableTrackingPayload(clientId, out.toByteArray());
+ }
+
+ @Override
+ protected Proxy externalizableProxy(final byte[] serialized) {
+ return new Proxy(serialized);
+ }
+}