CreateLocalHistoryRequest needs to be replicated to followers before
we respond to the frontend, as logically this request has to be
persisted before any subsequent transactions.
While the frontend could replay the request on reconnect, it would
also have to track the implied persistence (via child transactions),
which we do not want because it really is a backend detail and it
would lead to a lot of complexity in the frontend.
Change-Id: Icdfad59d3c2bab3d4125186c6a9b3c901d3934f6
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit
8f18717f60e58eebf726fe0611859311fa83df44)
try {
if (request instanceof CreateLocalHistoryRequest) {
try {
if (request instanceof CreateLocalHistoryRequest) {
- return handleCreateHistory((CreateLocalHistoryRequest) request);
+ return handleCreateHistory((CreateLocalHistoryRequest) request, envelope, now);
} else if (request instanceof DestroyLocalHistoryRequest) {
return handleDestroyHistory((DestroyLocalHistoryRequest) request, envelope, now);
} else if (request instanceof PurgeLocalHistoryRequest) {
} else if (request instanceof DestroyLocalHistoryRequest) {
return handleDestroyHistory((DestroyLocalHistoryRequest) request, envelope, now);
} else if (request instanceof PurgeLocalHistoryRequest) {
- private LocalHistorySuccess handleCreateHistory(final CreateLocalHistoryRequest request) throws RequestException {
- final LocalHistoryIdentifier id = request.getTarget();
- final AbstractFrontendHistory existing = localHistories.get(id);
+ private LocalHistorySuccess handleCreateHistory(final CreateLocalHistoryRequest request,
+ final RequestEnvelope envelope, final long now) throws RequestException {
+ final LocalHistoryIdentifier historyId = request.getTarget();
+ final AbstractFrontendHistory existing = localHistories.get(historyId);
if (existing != null) {
// History already exists: report success
if (existing != null) {
// History already exists: report success
- LOG.debug("{}: history {} already exists", persistenceId, id);
- return new LocalHistorySuccess(id, request.getSequence());
+ LOG.debug("{}: history {} already exists", persistenceId, historyId);
+ return new LocalHistorySuccess(historyId, request.getSequence());
}
// We have not found the history. Before we create it we need to check history ID sequencing so that we do not
// end up resurrecting a purged history.
}
// We have not found the history. Before we create it we need to check history ID sequencing so that we do not
// end up resurrecting a purged history.
- if (purgedHistories.contains(UnsignedLong.fromLongBits(id.getHistoryId()))) {
+ if (purgedHistories.contains(UnsignedLong.fromLongBits(historyId.getHistoryId()))) {
LOG.debug("{}: rejecting purged request {}", persistenceId, request);
throw new DeadHistoryException(purgedHistories);
}
// Update last history we have seen
LOG.debug("{}: rejecting purged request {}", persistenceId, request);
throw new DeadHistoryException(purgedHistories);
}
// Update last history we have seen
- if (lastSeenHistory == null || Long.compareUnsigned(lastSeenHistory, id.getHistoryId()) < 0) {
- lastSeenHistory = id.getHistoryId();
+ if (lastSeenHistory == null || Long.compareUnsigned(lastSeenHistory, historyId.getHistoryId()) < 0) {
+ lastSeenHistory = historyId.getHistoryId();
- localHistories.put(id, LocalFrontendHistory.create(persistenceId, tree, id));
- LOG.debug("{}: created history {}", persistenceId, id);
- return new LocalHistorySuccess(id, request.getSequence());
+ // We have to send the response only after persistence has completed
+ final ShardDataTreeTransactionChain chain = tree.ensureTransactionChain(historyId, () -> {
+ LOG.debug("{}: persisted history {}", persistenceId, historyId);
+ envelope.sendSuccess(new LocalHistorySuccess(historyId, request.getSequence()), tree.ticker().read() - now);
+ });
+
+ localHistories.put(historyId, LocalFrontendHistory.create(persistenceId, tree, chain));
+ LOG.debug("{}: created history {}", persistenceId, historyId);
+ return null;
}
private LocalHistorySuccess handleDestroyHistory(final DestroyLocalHistoryRequest request,
}
private LocalHistorySuccess handleDestroyHistory(final DestroyLocalHistoryRequest request,
}
static LocalFrontendHistory create(final String persistenceId, final ShardDataTree tree,
}
static LocalFrontendHistory create(final String persistenceId, final ShardDataTree tree,
- final LocalHistoryIdentifier historyId) {
- return new LocalFrontendHistory(persistenceId, tree, tree.ensureTransactionChain(historyId), ImmutableMap.of(),
- TreeRangeSet.create());
+ final ShardDataTreeTransactionChain chain) {
+ return new LocalFrontendHistory(persistenceId, tree, chain, ImmutableMap.of(), TreeRangeSet.create());
}
static LocalFrontendHistory recreate(final String persistenceId, final ShardDataTree tree,
}
static LocalFrontendHistory recreate(final String persistenceId, final ShardDataTree tree,
- ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier historyId) {
+ ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier historyId,
+ @Nullable final Runnable callback) {
ShardDataTreeTransactionChain chain = transactionChains.get(historyId);
if (chain == null) {
chain = new ShardDataTreeTransactionChain(historyId, this);
transactionChains.put(historyId, chain);
ShardDataTreeTransactionChain chain = transactionChains.get(historyId);
if (chain == null) {
chain = new ShardDataTreeTransactionChain(historyId, this);
transactionChains.put(historyId, chain);
- replicatePayload(historyId, CreateLocalHistoryPayload.create(historyId), null);
+ replicatePayload(historyId, CreateLocalHistoryPayload.create(historyId), callback);
+ } else if (callback != null) {
+ callback.run();
return new ReadOnlyShardDataTreeTransaction(this, txId, dataTree.takeSnapshot());
}
return new ReadOnlyShardDataTreeTransaction(this, txId, dataTree.takeSnapshot());
}
- return ensureTransactionChain(txId.getHistoryId()).newReadOnlyTransaction(txId);
+ return ensureTransactionChain(txId.getHistoryId(), null).newReadOnlyTransaction(txId);
}
ReadWriteShardDataTreeTransaction newReadWriteTransaction(final TransactionIdentifier txId) {
}
ReadWriteShardDataTreeTransaction newReadWriteTransaction(final TransactionIdentifier txId) {
- return ensureTransactionChain(txId.getHistoryId()).newReadWriteTransaction(txId);
+ return ensureTransactionChain(txId.getHistoryId(), null).newReadWriteTransaction(txId);
return createReadyCohort(txId, mod);
}
return createReadyCohort(txId, mod);
}
- return ensureTransactionChain(txId.getHistoryId()).createReadyCohort(txId, mod);
+ return ensureTransactionChain(txId.getHistoryId(), null).createReadyCohort(txId, mod);
}
@SuppressFBWarnings(value = "DB_DUPLICATE_SWITCH_CLAUSES", justification = "See inline comments below.")
}
@SuppressFBWarnings(value = "DB_DUPLICATE_SWITCH_CLAUSES", justification = "See inline comments below.")