abstract DataTreeSnapshot readOnlyView();
- abstract void applyModifyTransactionRequest(ModifyTransactionRequest request,
+ abstract void applyForwardedModifyTransactionRequest(ModifyTransactionRequest request,
@Nullable Consumer<Response<?, ?>> callback);
abstract void replayModifyTransactionRequest(ModifyTransactionRequest request,
}
@Override
- final void doAbort() {
- sendAbort(new AbortLocalTransactionRequest(identifier, localActor()),
- response -> LOG.debug("Transaction {} abort completed with {}", identifier, response));
+ final AbortLocalTransactionRequest abortRequest() {
+ return new AbortLocalTransactionRequest(identifier, localActor());
}
@Override
} else if (handleReadRequest(request, callback)) {
// No-op
} else if (request instanceof TransactionPurgeRequest) {
- enqueuePurge(enqueuedTicks);
+ enqueuePurge(callback, enqueuedTicks);
} else if (request instanceof IncrementTransactionSequenceRequest) {
// Local transactions do not have non-replayable requests which would be visible to the backend,
// hence we can skip sequence increments.
*/
void handleForwardedRemoteRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
if (request instanceof ModifyTransactionRequest) {
- applyModifyTransactionRequest((ModifyTransactionRequest) request, callback);
+ applyForwardedModifyTransactionRequest((ModifyTransactionRequest) request, callback);
} else if (handleReadRequest(request, callback)) {
// No-op
} else if (request instanceof TransactionPurgeRequest) {
- sendPurge();
+ sendPurge(callback);
} else {
throw new IllegalArgumentException("Unhandled request " + request);
}
successor.abort();
} else if (request instanceof TransactionPurgeRequest) {
LOG.debug("Forwarding purge {} to successor {}", request, successor);
- successor.sendPurge();
+ successor.sendPurge(callback);
} else {
throw new IllegalArgumentException("Unhandled request" + request);
}
if (request instanceof AbortLocalTransactionRequest) {
successor.sendAbort(request, callback);
} else if (request instanceof TransactionPurgeRequest) {
- successor.sendPurge();
+ successor.sendPurge(callback);
} else {
throw new IllegalArgumentException("Unhandled request" + request);
}