*/
final void abort() {
checkNotSealed();
- doAbort();
parent.abortTransaction(this);
+
+ sendRequest(abortRequest(), resp -> {
+ LOG.debug("Transaction {} abort completed with {}", getIdentifier(), resp);
+ sendPurge();
+ });
}
final void abort(final VotingFuture<Void> ret) {
checkSealed();
- sendAbort(t -> {
+ sendDoAbort(t -> {
if (t instanceof TransactionAbortSuccess) {
ret.voteYes();
} else if (t instanceof RequestFailure) {
}
final void enqueueAbort(final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
+ checkNotSealed();
+ parent.abortTransaction(this);
+
+ enqueueRequest(abortRequest(), resp -> {
+ LOG.debug("Transaction {} abort completed with {}", getIdentifier(), resp);
+ // Purge will be sent by the predecessor's callback
+ if (callback != null) {
+ callback.accept(resp);
+ }
+ }, enqueuedTicks);
+ }
+
+ final void enqueueDoAbort(final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
enqueueRequest(new TransactionAbortRequest(getIdentifier(), nextSequence(), localActor()), callback,
enqueuedTicks);
}
- final void sendAbort(final Consumer<Response<?, ?>> callback) {
+ final void sendDoAbort(final Consumer<Response<?, ?>> callback) {
sendRequest(new TransactionAbortRequest(getIdentifier(), nextSequence(), localActor()), callback);
}
});
}
- final void sendPurge() {
- successfulRequests.clear();
+ private void sendPurge() {
+ sendPurge(null);
+ }
- final TransactionRequest<?> req = new TransactionPurgeRequest(getIdentifier(), nextSequence(), localActor());
- sendRequest(req, t -> {
- LOG.debug("Transaction {} purge completed", this);
- parent.completeTransaction(this);
- });
+ final void sendPurge(final Consumer<Response<?, ?>> callback) {
+ sendRequest(purgeRequest(), resp -> completePurge(resp, callback));
+ }
+
+ final void enqueuePurge(final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
+ enqueueRequest(purgeRequest(), resp -> completePurge(resp, callback), enqueuedTicks);
}
- final void enqueuePurge(final long enqueuedTicks) {
+ private TransactionPurgeRequest purgeRequest() {
successfulRequests.clear();
+ return new TransactionPurgeRequest(getIdentifier(), nextSequence(), localActor());
+ }
- final TransactionRequest<?> req = new TransactionPurgeRequest(getIdentifier(), nextSequence(), localActor());
- enqueueRequest(req, t -> {
- LOG.debug("Transaction {} purge completed", this);
- parent.completeTransaction(this);
- }, enqueuedTicks);
+ private void completePurge(final Response<?, ?> resp, final Consumer<Response<?, ?>> callback) {
+ LOG.debug("Transaction {} purge completed", this);
+ parent.completeTransaction(this);
+ if (callback != null) {
+ callback.accept(resp);
+ }
}
// Called with the connection unlocked
abstract void doSeal();
- abstract void doAbort();
-
@GuardedBy("this")
abstract void flushState(AbstractProxyTransaction successor);
+ abstract TransactionRequest<?> abortRequest();
+
abstract TransactionRequest<?> commitRequest(boolean coordinated);
/**
abstract DataTreeSnapshot readOnlyView();
- abstract void applyModifyTransactionRequest(ModifyTransactionRequest request,
+ abstract void applyForwardedModifyTransactionRequest(ModifyTransactionRequest request,
@Nullable Consumer<Response<?, ?>> callback);
abstract void replayModifyTransactionRequest(ModifyTransactionRequest request,
}
@Override
- final void doAbort() {
- sendAbort(new AbortLocalTransactionRequest(identifier, localActor()),
- response -> LOG.debug("Transaction {} abort completed with {}", identifier, response));
+ final AbortLocalTransactionRequest abortRequest() {
+ return new AbortLocalTransactionRequest(identifier, localActor());
}
@Override
} else if (handleReadRequest(request, callback)) {
// No-op
} else if (request instanceof TransactionPurgeRequest) {
- enqueuePurge(enqueuedTicks);
+ enqueuePurge(callback, enqueuedTicks);
} else if (request instanceof IncrementTransactionSequenceRequest) {
// Local transactions do not have non-replayable requests which would be visible to the backend,
// hence we can skip sequence increments.
*/
void handleForwardedRemoteRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
if (request instanceof ModifyTransactionRequest) {
- applyModifyTransactionRequest((ModifyTransactionRequest) request, callback);
+ applyForwardedModifyTransactionRequest((ModifyTransactionRequest) request, callback);
} else if (handleReadRequest(request, callback)) {
// No-op
} else if (request instanceof TransactionPurgeRequest) {
- sendPurge();
+ sendPurge(callback);
} else {
throw new IllegalArgumentException("Unhandled request " + request);
}
successor.abort();
} else if (request instanceof TransactionPurgeRequest) {
LOG.debug("Forwarding purge {} to successor {}", request, successor);
- successor.sendPurge();
+ successor.sendPurge(callback);
} else {
throw new IllegalArgumentException("Unhandled request" + request);
}
if (request instanceof AbortLocalTransactionRequest) {
successor.sendAbort(request, callback);
} else if (request instanceof TransactionPurgeRequest) {
- successor.sendPurge();
+ successor.sendPurge(callback);
} else {
throw new IllegalArgumentException("Unhandled request" + request);
}
}
@Override
- void applyModifyTransactionRequest(final ModifyTransactionRequest request,
+ void applyForwardedModifyTransactionRequest(final ModifyTransactionRequest request,
final Consumer<Response<?, ?>> callback) {
- commonModifyTransactionRequest(request, callback);
+ commonModifyTransactionRequest(request);
abort();
}
@Override
void replayModifyTransactionRequest(final ModifyTransactionRequest request,
final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
- commonModifyTransactionRequest(request, callback);
- // FIXME: this should go through the enqueueRequest() path
- abort();
+ commonModifyTransactionRequest(request);
+ enqueueAbort(callback, enqueuedTicks);
}
- private static void commonModifyTransactionRequest(final ModifyTransactionRequest request,
- final Consumer<Response<?, ?>> callback) {
+ private static void commonModifyTransactionRequest(final ModifyTransactionRequest request) {
Verify.verify(request.getModifications().isEmpty());
final PersistenceProtocol protocol = request.getPersistenceProtocol().get();
}
@Override
- void applyModifyTransactionRequest(final ModifyTransactionRequest request,
+ void applyForwardedModifyTransactionRequest(final ModifyTransactionRequest request,
final @Nullable Consumer<Response<?, ?>> callback) {
commonModifyTransactionRequest(request, callback, this::sendRequest);
}
enqueueRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback,
enqueuedTicks);
} else if (request instanceof TransactionAbortRequest) {
- enqueueAbort(callback, enqueuedTicks);
+ enqueueDoAbort(callback, enqueuedTicks);
} else {
super.handleReplayedRemoteRequest(request, callback, enqueuedTicks);
}
} else if (request instanceof TransactionDoCommitRequest) {
sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback);
} else if (request instanceof TransactionAbortRequest) {
- sendAbort(callback);
+ sendDoAbort(callback);
} else {
super.handleForwardedRemoteRequest(request, callback);
}
final void abortTransaction(final AbstractProxyTransaction tx) {
lock.lock();
try {
- proxies.remove(tx.getIdentifier());
- LOG.debug("Proxy {} aborting transaction {}", this, tx);
+ // Removal will be completed once purge completes
+ LOG.debug("Proxy {} aborted transaction {}", this, tx);
onTransactionAborted(tx);
} finally {
lock.unlock();
isSnapshotOnly()), t -> completeRead(future, t), future);
}
- @Override
- void doAbort() {
- ensureInitializedBuilder();
- builder.setAbort();
- flushBuilder();
- }
-
private void ensureInitializedBuilder() {
if (!builderBusy) {
builder.setSequence(nextSequence());
recordFinishedRequest(response);
}
- private ModifyTransactionRequest abortRequest() {
+ @Override
+ ModifyTransactionRequest abortRequest() {
ensureInitializedBuilder();
builder.setAbort();
final ModifyTransactionRequest ret = builder.build();
sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback);
} else if (request instanceof TransactionAbortRequest) {
ensureFlushedBuider();
- sendAbort(callback);
+ sendDoAbort(callback);
} else if (request instanceof TransactionPurgeRequest) {
- sendPurge();
+ sendPurge(callback);
} else {
throw new IllegalArgumentException("Unhandled request {}" + request);
}
enqueuedTicks);
} else if (request instanceof TransactionAbortRequest) {
ensureFlushedBuider(optTicks);
- enqueueAbort(callback, enqueuedTicks);
+ enqueueDoAbort(callback, enqueuedTicks);
} else if (request instanceof TransactionPurgeRequest) {
- enqueuePurge(enqueuedTicks);
+ enqueuePurge(callback, enqueuedTicks);
} else if (request instanceof IncrementTransactionSequenceRequest) {
final IncrementTransactionSequenceRequest req = (IncrementTransactionSequenceRequest) request;
ensureFlushedBuider(optTicks);
}
@Test
- public void testDoAbort() throws Exception {
- transaction.doAbort();
+ public void testAbort() throws Exception {
+ transaction.abort();
getTester().expectTransactionRequest(AbortLocalTransactionRequest.class);
}