X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FFrontendClientMetadataBuilder.java;h=7e6eced779bdf4026dca36134b2b13e7509f1bbb;hp=fbaf76fbc5098b6ea2ca07929de5acb00ed9e587;hb=33ade248cf6070455349fe343c0d0fd48d274717;hpb=522b2d4f69cb0c1ca689b9826498a9e01ac0ae7c diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendClientMetadataBuilder.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendClientMetadataBuilder.java index fbaf76fbc5..7e6eced779 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendClientMetadataBuilder.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendClientMetadataBuilder.java @@ -11,7 +11,13 @@ import static com.google.common.base.Verify.verify; import static java.util.Objects.requireNonNull; import com.google.common.base.MoreObjects; +import com.google.common.base.MoreObjects.ToStringHelper; import com.google.common.collect.Collections2; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableRangeSet; +import com.google.common.collect.RangeSet; +import com.google.common.primitives.UnsignedLong; +import java.util.Collection; import java.util.HashMap; import java.util.Map; import org.eclipse.jdt.annotation.NonNull; @@ -29,173 +35,271 @@ import org.slf4j.LoggerFactory; /** * This class is NOT thread-safe. */ -final class FrontendClientMetadataBuilder implements Builder, Identifiable { - private static final Logger LOG = LoggerFactory.getLogger(FrontendClientMetadataBuilder.class); +abstract class FrontendClientMetadataBuilder implements Builder, + Identifiable { + static final class Disabled extends FrontendClientMetadataBuilder { + Disabled(final String shardName, final ClientIdentifier identifier) { + super(shardName, identifier); + } - private final Map currentHistories = new HashMap<>(); - private final UnsignedLongRangeSet purgedHistories; - private final LocalHistoryIdentifier standaloneId; - private final ClientIdentifier identifier; - private final String shardName; + @Override + public FrontendClientMetadata build() { + return new FrontendClientMetadata(getIdentifier(), ImmutableRangeSet.of(), ImmutableList.of()); + } - FrontendClientMetadataBuilder(final String shardName, final ClientIdentifier identifier) { - this.shardName = requireNonNull(shardName); - this.identifier = requireNonNull(identifier); - purgedHistories = UnsignedLongRangeSet.create(); + @Override + void onHistoryCreated(final LocalHistoryIdentifier historyId) { + // No-op + } - // History for stand-alone transactions is always present - standaloneId = standaloneHistoryId(); - currentHistories.put(standaloneId, new FrontendHistoryMetadataBuilder(standaloneId)); - } + @Override + void onHistoryClosed(final LocalHistoryIdentifier historyId) { + // No-op + } - FrontendClientMetadataBuilder(final String shardName, final FrontendClientMetadata meta) { - this.shardName = requireNonNull(shardName); - this.identifier = meta.getIdentifier(); - purgedHistories = UnsignedLongRangeSet.create(meta.getPurgedHistories()); + @Override + void onHistoryPurged(final LocalHistoryIdentifier historyId) { + // No-op + } - for (FrontendHistoryMetadata h : meta.getCurrentHistories()) { - final FrontendHistoryMetadataBuilder b = new FrontendHistoryMetadataBuilder(identifier, h); - currentHistories.put(b.getIdentifier(), b); + @Override + void onTransactionAborted(final TransactionIdentifier txId) { + // No-op } - // Sanity check and recovery - standaloneId = standaloneHistoryId(); - if (!currentHistories.containsKey(standaloneId)) { - LOG.warn("{}: Client {} recovered histories {} do not contain stand-alone history, attempting recovery", - shardName, identifier, currentHistories); - currentHistories.put(standaloneId, new FrontendHistoryMetadataBuilder(standaloneId)); + @Override + void onTransactionCommitted(final TransactionIdentifier txId) { + // No-op } - } - private LocalHistoryIdentifier standaloneHistoryId() { - return new LocalHistoryIdentifier(identifier, 0); - } + @Override + void onTransactionPurged(final TransactionIdentifier txId) { + // No-op + } - @Override - public FrontendClientMetadata build() { - return new FrontendClientMetadata(identifier, purgedHistories.toImmutable(), - Collections2.transform(currentHistories.values(), FrontendHistoryMetadataBuilder::build)); + @Override + LeaderFrontendState toLeaderState(final Shard shard) { + return new LeaderFrontendState.Disabled(shard.persistenceId(), getIdentifier(), shard.getDataStore()); + } } - @Override - public ClientIdentifier getIdentifier() { - return identifier; - } + static final class Enabled extends FrontendClientMetadataBuilder { + + private final Map currentHistories = new HashMap<>(); + private final UnsignedLongRangeSet purgedHistories; + private final LocalHistoryIdentifier standaloneId; + + Enabled(final String shardName, final ClientIdentifier identifier) { + super(shardName, identifier); + + purgedHistories = UnsignedLongRangeSet.create(); - void onHistoryCreated(final LocalHistoryIdentifier historyId) { - final FrontendHistoryMetadataBuilder newMeta = new FrontendHistoryMetadataBuilder(historyId); - final FrontendHistoryMetadataBuilder oldMeta = currentHistories.putIfAbsent(historyId, newMeta); - if (oldMeta != null) { - // This should not be happening, warn about it - LOG.warn("{}: Reused local history {}", shardName, historyId); - } else { - LOG.debug("{}: Created local history {}", shardName, historyId); + // History for stand-alone transactions is always present + standaloneId = standaloneHistoryId(); + currentHistories.put(standaloneId, new FrontendHistoryMetadataBuilder(standaloneId)); } - } - void onHistoryClosed(final LocalHistoryIdentifier historyId) { - final FrontendHistoryMetadataBuilder builder = currentHistories.get(historyId); - if (builder != null) { - builder.onHistoryClosed(); - LOG.debug("{}: Closed history {}", shardName, historyId); - } else { - LOG.warn("{}: Closed unknown history {}, ignoring", shardName, historyId); + Enabled(final String shardName, final FrontendClientMetadata meta) { + super(shardName, meta.getIdentifier()); + + purgedHistories = UnsignedLongRangeSet.create(meta.getPurgedHistories()); + for (FrontendHistoryMetadata h : meta.getCurrentHistories()) { + final FrontendHistoryMetadataBuilder b = new FrontendHistoryMetadataBuilder(getIdentifier(), h); + currentHistories.put(b.getIdentifier(), b); + } + + // Sanity check and recovery + standaloneId = standaloneHistoryId(); + if (!currentHistories.containsKey(standaloneId)) { + LOG.warn("{}: Client {} recovered histories {} do not contain stand-alone history, attempting recovery", + shardName, getIdentifier(), currentHistories); + currentHistories.put(standaloneId, new FrontendHistoryMetadataBuilder(standaloneId)); + } + } + + @Override + public FrontendClientMetadata build() { + return new FrontendClientMetadata(getIdentifier(), purgedHistories.toImmutable(), + Collections2.transform(currentHistories.values(), FrontendHistoryMetadataBuilder::build)); + } + + @Override + void onHistoryCreated(final LocalHistoryIdentifier historyId) { + final FrontendHistoryMetadataBuilder newMeta = new FrontendHistoryMetadataBuilder(historyId); + final FrontendHistoryMetadataBuilder oldMeta = currentHistories.putIfAbsent(historyId, newMeta); + if (oldMeta != null) { + // This should not be happening, warn about it + LOG.warn("{}: Reused local history {}", shardName(), historyId); + } else { + LOG.debug("{}: Created local history {}", shardName(), historyId); + } } - } - void onHistoryPurged(final LocalHistoryIdentifier historyId) { - final FrontendHistoryMetadataBuilder history = currentHistories.remove(historyId); - final long historyBits = historyId.getHistoryId(); - if (history == null) { - if (!purgedHistories.contains(historyBits)) { + @Override + void onHistoryClosed(final LocalHistoryIdentifier historyId) { + final FrontendHistoryMetadataBuilder builder = currentHistories.get(historyId); + if (builder != null) { + builder.onHistoryClosed(); + LOG.debug("{}: Closed history {}", shardName(), historyId); + } else { + LOG.warn("{}: Closed unknown history {}, ignoring", shardName(), historyId); + } + } + + @Override + void onHistoryPurged(final LocalHistoryIdentifier historyId) { + final FrontendHistoryMetadataBuilder history = currentHistories.remove(historyId); + final long historyBits = historyId.getHistoryId(); + if (history == null) { + if (!purgedHistories.contains(historyBits)) { + purgedHistories.add(historyBits); + LOG.warn("{}: Purging unknown history {}", shardName(), historyId); + } else { + LOG.warn("{}: Duplicate purge of history {}", shardName(), historyId); + } + } else { purgedHistories.add(historyBits); - LOG.warn("{}: Purging unknown history {}", shardName, historyId); + LOG.debug("{}: Purged history {}", shardName(), historyId); + } + } + + @Override + void onTransactionAborted(final TransactionIdentifier txId) { + final FrontendHistoryMetadataBuilder history = getHistory(txId); + if (history != null) { + history.onTransactionAborted(txId); + LOG.debug("{}: Aborted transaction {}", shardName(), txId); } else { - LOG.warn("{}: Duplicate purge of history {}", shardName, historyId); + LOG.warn("{}: Unknown history for aborted transaction {}, ignoring", shardName(), txId); } - } else { - purgedHistories.add(historyBits); - LOG.debug("{}: Purged history {}", shardName, historyId); } - } - void onTransactionAborted(final TransactionIdentifier txId) { - final FrontendHistoryMetadataBuilder history = getHistory(txId); - if (history != null) { - history.onTransactionAborted(txId); - LOG.debug("{}: Aborted transaction {}", shardName, txId); - } else { - LOG.warn("{}: Unknown history for aborted transaction {}, ignoring", shardName, txId); + @Override + void onTransactionCommitted(final TransactionIdentifier txId) { + final FrontendHistoryMetadataBuilder history = getHistory(txId); + if (history != null) { + history.onTransactionCommitted(txId); + LOG.debug("{}: Committed transaction {}", shardName(), txId); + } else { + LOG.warn("{}: Unknown history for commited transaction {}, ignoring", shardName(), txId); + } } - } - void onTransactionCommitted(final TransactionIdentifier txId) { - final FrontendHistoryMetadataBuilder history = getHistory(txId); - if (history != null) { - history.onTransactionCommitted(txId); - LOG.debug("{}: Committed transaction {}", shardName, txId); - } else { - LOG.warn("{}: Unknown history for commited transaction {}, ignoring", shardName, txId); + @Override + void onTransactionPurged(final TransactionIdentifier txId) { + final FrontendHistoryMetadataBuilder history = getHistory(txId); + if (history != null) { + history.onTransactionPurged(txId); + LOG.debug("{}: Purged transaction {}", shardName(), txId); + } else { + LOG.warn("{}: Unknown history for purged transaction {}, ignoring", shardName(), txId); + } } - } - void onTransactionPurged(final TransactionIdentifier txId) { - final FrontendHistoryMetadataBuilder history = getHistory(txId); - if (history != null) { - history.onTransactionPurged(txId); - LOG.debug("{}: Purged transaction {}", shardName, txId); - } else { - LOG.warn("{}: Unknown history for purged transaction {}, ignoring", shardName, txId); + @Override + LeaderFrontendState toLeaderState(final Shard shard) { + // Note: we have to make sure to *copy* all current state and not leak any views, otherwise leader/follower + // interactions would get intertwined leading to inconsistencies. + final Map histories = new HashMap<>(); + for (FrontendHistoryMetadataBuilder e : currentHistories.values()) { + if (e.getIdentifier().getHistoryId() != 0) { + final AbstractFrontendHistory state = e.toLeaderState(shard); + verify(state instanceof LocalFrontendHistory, "Unexpected state %s", state); + histories.put(e.getIdentifier(), (LocalFrontendHistory) state); + } + } + + final AbstractFrontendHistory singleHistory; + final FrontendHistoryMetadataBuilder singleHistoryMeta = currentHistories.get( + new LocalHistoryIdentifier(getIdentifier(), 0)); + if (singleHistoryMeta == null) { + final ShardDataTree tree = shard.getDataStore(); + singleHistory = StandaloneFrontendHistory.create(shard.persistenceId(), getIdentifier(), tree); + } else { + singleHistory = singleHistoryMeta.toLeaderState(shard); + } + + return new LeaderFrontendState.Enabled(shard.persistenceId(), getIdentifier(), shard.getDataStore(), + purgedHistories.copy(), singleHistory, histories); } - } - /** - * Transform frontend metadata for a particular client into its {@link LeaderFrontendState} counterpart. - * - * @param shard parent shard - * @return Leader frontend state - */ - @NonNull LeaderFrontendState toLeaderState(final @NonNull Shard shard) { - // Note: we have to make sure to *copy* all current state and not leak any views, otherwise leader/follower - // interactions would get intertwined leading to inconsistencies. - final Map histories = new HashMap<>(); - for (FrontendHistoryMetadataBuilder e : currentHistories.values()) { - if (e.getIdentifier().getHistoryId() != 0) { - final AbstractFrontendHistory state = e.toLeaderState(shard); - verify(state instanceof LocalFrontendHistory, "Unexpected state %s", state); - histories.put(e.getIdentifier(), (LocalFrontendHistory) state); + @Override + ToStringHelper addToStringAttributes(final ToStringHelper helper) { + return super.addToStringAttributes(helper).add("current", currentHistories).add("purged", purgedHistories); + } + + private FrontendHistoryMetadataBuilder getHistory(final TransactionIdentifier txId) { + LocalHistoryIdentifier historyId = txId.getHistoryId(); + if (historyId.getHistoryId() == 0 && historyId.getCookie() != 0) { + // We are pre-creating the history for free-standing transactions with a zero cookie, hence our lookup + // needs to account for that. + LOG.debug("{}: looking up {} instead of {}", shardName(), standaloneId, historyId); + historyId = standaloneId; } + + return currentHistories.get(historyId); } - final AbstractFrontendHistory singleHistory; - final FrontendHistoryMetadataBuilder singleHistoryMeta = currentHistories.get( - new LocalHistoryIdentifier(identifier, 0)); - if (singleHistoryMeta == null) { - final ShardDataTree tree = shard.getDataStore(); - singleHistory = StandaloneFrontendHistory.create(shard.persistenceId(), getIdentifier(), tree); - } else { - singleHistory = singleHistoryMeta.toLeaderState(shard); + private LocalHistoryIdentifier standaloneHistoryId() { + return new LocalHistoryIdentifier(getIdentifier(), 0); } + } - return new LeaderFrontendState(shard.persistenceId(), getIdentifier(), shard.getDataStore(), - purgedHistories.copy(), singleHistory, histories); + private static final Logger LOG = LoggerFactory.getLogger(FrontendClientMetadataBuilder.class); + + private final ClientIdentifier identifier; + private final String shardName; + + FrontendClientMetadataBuilder(final String shardName, final ClientIdentifier identifier) { + this.shardName = requireNonNull(shardName); + this.identifier = requireNonNull(identifier); } - private FrontendHistoryMetadataBuilder getHistory(final TransactionIdentifier txId) { - LocalHistoryIdentifier historyId = txId.getHistoryId(); - if (historyId.getHistoryId() == 0 && historyId.getCookie() != 0) { - // We are pre-creating the history for free-standing transactions with a zero cookie, hence our lookup - // needs to account for that. - LOG.debug("{}: looking up {} instead of {}", shardName, standaloneId, historyId); - historyId = standaloneId; - } + static FrontendClientMetadataBuilder of(final String shardName, final FrontendClientMetadata meta) { + final Collection current = meta.getCurrentHistories(); + final RangeSet purged = meta.getPurgedHistories(); + + // Completely empty histories imply disabled state, as otherwise we'd have a record of the single history -- + // either purged or active + return current.isEmpty() && purged.isEmpty() ? new Disabled(shardName, meta.getIdentifier()) + : new Enabled(shardName, meta); + } + + @Override + public final ClientIdentifier getIdentifier() { + return identifier; + } - return currentHistories.get(historyId); + final String shardName() { + return shardName; } + abstract void onHistoryCreated(LocalHistoryIdentifier historyId); + + abstract void onHistoryClosed(LocalHistoryIdentifier historyId); + + abstract void onHistoryPurged(LocalHistoryIdentifier historyId); + + abstract void onTransactionAborted(TransactionIdentifier txId); + + abstract void onTransactionCommitted(TransactionIdentifier txId); + + abstract void onTransactionPurged(TransactionIdentifier txId); + + /** + * Transform frontend metadata for a particular client into its {@link LeaderFrontendState} counterpart. + * + * @param shard parent shard + * @return Leader frontend state + */ + abstract @NonNull LeaderFrontendState toLeaderState(@NonNull Shard shard); + @Override - public String toString() { - return MoreObjects.toStringHelper(this).add("identifier", identifier).add("current", currentHistories) - .add("purged", purgedHistories).toString(); + public final String toString() { + return addToStringAttributes(MoreObjects.toStringHelper(this)).toString(); + } + + ToStringHelper addToStringAttributes(final ToStringHelper helper) { + return helper.add("identifier", identifier); } }