private static final Logger LOG = LoggerFactory.getLogger(LeaderFrontendState.class);
// Histories which have not been purged
- private final Map<LocalHistoryIdentifier, LocalFrontendHistory> localHistories = new HashMap<>();
+ private final Map<LocalHistoryIdentifier, LocalFrontendHistory> localHistories;
// RangeSet performs automatic merging, hence we keep minimal state tracking information
- private final RangeSet<UnsignedLong> purgedHistories = TreeRangeSet.create();
+ private final RangeSet<UnsignedLong> purgedHistories;
// Used for all standalone transactions
private final AbstractFrontendHistory standaloneHistory;
private long expectedTxSequence;
private Long lastSeenHistory = null;
-
// TODO: explicit failover notification
// Record the ActorRef for the originating actor and when we switch to being a leader send a notification
// to the frontend client -- that way it can immediately start sending requests
// - per-RequestException throw counters
LeaderFrontendState(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree) {
+ this(persistenceId, clientId, tree, TreeRangeSet.create(), StandaloneFrontendHistory.create(persistenceId,
+ clientId, tree), new HashMap<>());
+ }
+
+ LeaderFrontendState(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree,
+ final RangeSet<UnsignedLong> purgedHistories, final AbstractFrontendHistory standaloneHistory,
+ final Map<LocalHistoryIdentifier, LocalFrontendHistory> localHistories) {
this.persistenceId = Preconditions.checkNotNull(persistenceId);
this.clientId = Preconditions.checkNotNull(clientId);
this.tree = Preconditions.checkNotNull(tree);
- standaloneHistory = new StandaloneFrontendHistory(persistenceId, tree.ticker(), clientId, tree);
+ this.purgedHistories = Preconditions.checkNotNull(purgedHistories);
+ this.standaloneHistory = Preconditions.checkNotNull(standaloneHistory);
+ this.localHistories = Preconditions.checkNotNull(localHistories);
}
@Override
} else if (request instanceof PurgeLocalHistoryRequest) {
return handlePurgeHistory((PurgeLocalHistoryRequest)request, envelope, now);
} else {
+ LOG.warn("{}: rejecting unsupported request {}", persistenceId, request);
throw new UnsupportedRequestException(request);
}
} finally {
}
// Update last history we have seen
- if (lastSeenHistory != null && Long.compareUnsigned(lastSeenHistory, id.getHistoryId()) < 0) {
+ if (lastSeenHistory == null || Long.compareUnsigned(lastSeenHistory, id.getHistoryId()) < 0) {
lastSeenHistory = id.getHistoryId();
}
- localHistories.put(id, new LocalFrontendHistory(persistenceId, tree, tree.ensureTransactionChain(id)));
+ localHistories.put(id, LocalFrontendHistory.create(persistenceId, tree, id));
LOG.debug("{}: created history {}", persistenceId, id);
return new LocalHistorySuccess(id, request.getSequence());
}