+ static final class Enabled extends FrontendClientMetadataBuilder {
+
+ private final Map<LocalHistoryIdentifier, FrontendHistoryMetadataBuilder> currentHistories = new HashMap<>();
+ private final UnsignedLongRangeSet purgedHistories;
+ private final LocalHistoryIdentifier standaloneId;
+
+ Enabled(final String shardName, final ClientIdentifier identifier) {
+ super(shardName, identifier);
+
+ purgedHistories = UnsignedLongRangeSet.create();
+
+ // History for stand-alone transactions is always present
+ standaloneId = standaloneHistoryId();
+ currentHistories.put(standaloneId, new FrontendHistoryMetadataBuilder(standaloneId));
+ }
+
+ Enabled(final String shardName, final FrontendClientMetadata meta) {
+ super(shardName, meta.getIdentifier());
+
+ purgedHistories = UnsignedLongRangeSet.create(meta.getPurgedHistories());
+ for (FrontendHistoryMetadata h : meta.getCurrentHistories()) {
+ final FrontendHistoryMetadataBuilder b = new FrontendHistoryMetadataBuilder(getIdentifier(), h);
+ currentHistories.put(b.getIdentifier(), b);
+ }
+
+ // Sanity check and recovery
+ standaloneId = standaloneHistoryId();
+ if (!currentHistories.containsKey(standaloneId)) {
+ LOG.warn("{}: Client {} recovered histories {} do not contain stand-alone history, attempting recovery",
+ shardName, getIdentifier(), currentHistories);
+ currentHistories.put(standaloneId, new FrontendHistoryMetadataBuilder(standaloneId));
+ }
+ }
+
+ @Override
+ public FrontendClientMetadata build() {
+ return new FrontendClientMetadata(getIdentifier(), purgedHistories.toImmutable(),
+ Collections2.transform(currentHistories.values(), FrontendHistoryMetadataBuilder::build));
+ }
+
+ @Override
+ void onHistoryCreated(final LocalHistoryIdentifier historyId) {
+ final FrontendHistoryMetadataBuilder newMeta = new FrontendHistoryMetadataBuilder(historyId);
+ final FrontendHistoryMetadataBuilder oldMeta = currentHistories.putIfAbsent(historyId, newMeta);
+ if (oldMeta != null) {
+ // This should not be happening, warn about it
+ LOG.warn("{}: Reused local history {}", shardName(), historyId);
+ } else {
+ LOG.debug("{}: Created local history {}", shardName(), historyId);
+ }
+ }
+
+ @Override
+ void onHistoryClosed(final LocalHistoryIdentifier historyId) {
+ final FrontendHistoryMetadataBuilder builder = currentHistories.get(historyId);
+ if (builder != null) {
+ builder.onHistoryClosed();
+ LOG.debug("{}: Closed history {}", shardName(), historyId);
+ } else {
+ LOG.warn("{}: Closed unknown history {}, ignoring", shardName(), historyId);
+ }
+ }
+
+ @Override
+ void onHistoryPurged(final LocalHistoryIdentifier historyId) {
+ final FrontendHistoryMetadataBuilder history = currentHistories.remove(historyId);
+ final long historyBits = historyId.getHistoryId();
+ if (history == null) {
+ if (!purgedHistories.contains(historyBits)) {
+ purgedHistories.add(historyBits);
+ LOG.warn("{}: Purging unknown history {}", shardName(), historyId);
+ } else {
+ LOG.warn("{}: Duplicate purge of history {}", shardName(), historyId);
+ }
+ } else {
+ purgedHistories.add(historyBits);
+ LOG.debug("{}: Purged history {}", shardName(), historyId);
+ }
+ }
+
+ @Override
+ void onTransactionAborted(final TransactionIdentifier txId) {
+ final FrontendHistoryMetadataBuilder history = getHistory(txId);
+ if (history != null) {
+ history.onTransactionAborted(txId);
+ LOG.debug("{}: Aborted transaction {}", shardName(), txId);
+ } else {
+ LOG.warn("{}: Unknown history for aborted transaction {}, ignoring", shardName(), txId);
+ }
+ }