try {
if (request instanceof CreateLocalHistoryRequest) {
- return handleCreateHistory((CreateLocalHistoryRequest) request);
+ return handleCreateHistory((CreateLocalHistoryRequest) request, envelope, now);
} else if (request instanceof DestroyLocalHistoryRequest) {
return handleDestroyHistory((DestroyLocalHistoryRequest) request, envelope, now);
} else if (request instanceof PurgeLocalHistoryRequest) {
}
}
- private LocalHistorySuccess handleCreateHistory(final CreateLocalHistoryRequest request) throws RequestException {
- final LocalHistoryIdentifier id = request.getTarget();
- final AbstractFrontendHistory existing = localHistories.get(id);
+ private LocalHistorySuccess handleCreateHistory(final CreateLocalHistoryRequest request,
+ final RequestEnvelope envelope, final long now) throws RequestException {
+ final LocalHistoryIdentifier historyId = request.getTarget();
+ final AbstractFrontendHistory existing = localHistories.get(historyId);
if (existing != null) {
// History already exists: report success
- LOG.debug("{}: history {} already exists", persistenceId, id);
- return new LocalHistorySuccess(id, request.getSequence());
+ LOG.debug("{}: history {} already exists", persistenceId, historyId);
+ return new LocalHistorySuccess(historyId, request.getSequence());
}
// We have not found the history. Before we create it we need to check history ID sequencing so that we do not
// end up resurrecting a purged history.
- if (purgedHistories.contains(UnsignedLong.fromLongBits(id.getHistoryId()))) {
+ if (purgedHistories.contains(UnsignedLong.fromLongBits(historyId.getHistoryId()))) {
LOG.debug("{}: rejecting purged request {}", persistenceId, request);
throw new DeadHistoryException(purgedHistories);
}
// Update last history we have seen
- if (lastSeenHistory == null || Long.compareUnsigned(lastSeenHistory, id.getHistoryId()) < 0) {
- lastSeenHistory = id.getHistoryId();
+ if (lastSeenHistory == null || Long.compareUnsigned(lastSeenHistory, historyId.getHistoryId()) < 0) {
+ lastSeenHistory = historyId.getHistoryId();
}
- localHistories.put(id, LocalFrontendHistory.create(persistenceId, tree, id));
- LOG.debug("{}: created history {}", persistenceId, id);
- return new LocalHistorySuccess(id, request.getSequence());
+ // We have to send the response only after persistence has completed
+ final ShardDataTreeTransactionChain chain = tree.ensureTransactionChain(historyId, () -> {
+ LOG.debug("{}: persisted history {}", persistenceId, historyId);
+ envelope.sendSuccess(new LocalHistorySuccess(historyId, request.getSequence()), tree.ticker().read() - now);
+ });
+
+ localHistories.put(historyId, LocalFrontendHistory.create(persistenceId, tree, chain));
+ LOG.debug("{}: created history {}", persistenceId, historyId);
+ return null;
}
private LocalHistorySuccess handleDestroyHistory(final DestroyLocalHistoryRequest request,
return ret;
}
- ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier historyId) {
+ ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier historyId,
+ @Nullable final Runnable callback) {
ShardDataTreeTransactionChain chain = transactionChains.get(historyId);
if (chain == null) {
chain = new ShardDataTreeTransactionChain(historyId, this);
transactionChains.put(historyId, chain);
- replicatePayload(historyId, CreateLocalHistoryPayload.create(historyId), null);
+ replicatePayload(historyId, CreateLocalHistoryPayload.create(historyId), callback);
+ } else if (callback != null) {
+ callback.run();
}
return chain;
return new ReadOnlyShardDataTreeTransaction(this, txId, dataTree.takeSnapshot());
}
- return ensureTransactionChain(txId.getHistoryId()).newReadOnlyTransaction(txId);
+ return ensureTransactionChain(txId.getHistoryId(), null).newReadOnlyTransaction(txId);
}
ReadWriteShardDataTreeTransaction newReadWriteTransaction(final TransactionIdentifier txId) {
.newModification());
}
- return ensureTransactionChain(txId.getHistoryId()).newReadWriteTransaction(txId);
+ return ensureTransactionChain(txId.getHistoryId(), null).newReadWriteTransaction(txId);
}
@VisibleForTesting
return createReadyCohort(txId, mod);
}
- return ensureTransactionChain(txId.getHistoryId()).createReadyCohort(txId, mod);
+ return ensureTransactionChain(txId.getHistoryId(), null).createReadyCohort(txId, mod);
}
@SuppressFBWarnings(value = "DB_DUPLICATE_SWITCH_CLAUSES", justification = "See inline comments below.")