Direct transaction abort path can end up touching proxy history's
maps, which it should not, as that happens only after purge. This
inconsistency has cropped up when purge was introduced.
Refactor the methods so that cohorts are removed only after purge,
and fix abort request routing such that it always enqueues a purge
request (possibly via successor). This also addresses a FIXME, as
we now have an enqueueAbort() request, which is not waiting on the
queue.
Change-Id: Ie291da70ace772274f33505db376a915b38e37c0
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
*/
final void abort() {
checkNotSealed();
*/
final void abort() {
checkNotSealed();
parent.abortTransaction(this);
parent.abortTransaction(this);
+
+ sendRequest(abortRequest(), resp -> {
+ LOG.debug("Transaction {} abort completed with {}", getIdentifier(), resp);
+ sendPurge();
+ });
}
final void abort(final VotingFuture<Void> ret) {
checkSealed();
}
final void abort(final VotingFuture<Void> ret) {
checkSealed();
if (t instanceof TransactionAbortSuccess) {
ret.voteYes();
} else if (t instanceof RequestFailure) {
if (t instanceof TransactionAbortSuccess) {
ret.voteYes();
} else if (t instanceof RequestFailure) {
}
final void enqueueAbort(final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
}
final void enqueueAbort(final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
+ checkNotSealed();
+ parent.abortTransaction(this);
+
+ enqueueRequest(abortRequest(), resp -> {
+ LOG.debug("Transaction {} abort completed with {}", getIdentifier(), resp);
+ // Purge will be sent by the predecessor's callback
+ if (callback != null) {
+ callback.accept(resp);
+ }
+ }, enqueuedTicks);
+ }
+
+ final void enqueueDoAbort(final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
enqueueRequest(new TransactionAbortRequest(getIdentifier(), nextSequence(), localActor()), callback,
enqueuedTicks);
}
enqueueRequest(new TransactionAbortRequest(getIdentifier(), nextSequence(), localActor()), callback,
enqueuedTicks);
}
- final void sendAbort(final Consumer<Response<?, ?>> callback) {
+ final void sendDoAbort(final Consumer<Response<?, ?>> callback) {
sendRequest(new TransactionAbortRequest(getIdentifier(), nextSequence(), localActor()), callback);
}
sendRequest(new TransactionAbortRequest(getIdentifier(), nextSequence(), localActor()), callback);
}
- final void sendPurge() {
- successfulRequests.clear();
+ private void sendPurge() {
+ sendPurge(null);
+ }
- final TransactionRequest<?> req = new TransactionPurgeRequest(getIdentifier(), nextSequence(), localActor());
- sendRequest(req, t -> {
- LOG.debug("Transaction {} purge completed", this);
- parent.completeTransaction(this);
- });
+ final void sendPurge(final Consumer<Response<?, ?>> callback) {
+ sendRequest(purgeRequest(), resp -> completePurge(resp, callback));
+ }
+
+ final void enqueuePurge(final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
+ enqueueRequest(purgeRequest(), resp -> completePurge(resp, callback), enqueuedTicks);
- final void enqueuePurge(final long enqueuedTicks) {
+ private TransactionPurgeRequest purgeRequest() {
successfulRequests.clear();
successfulRequests.clear();
+ return new TransactionPurgeRequest(getIdentifier(), nextSequence(), localActor());
+ }
- final TransactionRequest<?> req = new TransactionPurgeRequest(getIdentifier(), nextSequence(), localActor());
- enqueueRequest(req, t -> {
- LOG.debug("Transaction {} purge completed", this);
- parent.completeTransaction(this);
- }, enqueuedTicks);
+ private void completePurge(final Response<?, ?> resp, final Consumer<Response<?, ?>> callback) {
+ LOG.debug("Transaction {} purge completed", this);
+ parent.completeTransaction(this);
+ if (callback != null) {
+ callback.accept(resp);
+ }
}
// Called with the connection unlocked
}
// Called with the connection unlocked
- abstract void doAbort();
-
@GuardedBy("this")
abstract void flushState(AbstractProxyTransaction successor);
@GuardedBy("this")
abstract void flushState(AbstractProxyTransaction successor);
+ abstract TransactionRequest<?> abortRequest();
+
abstract TransactionRequest<?> commitRequest(boolean coordinated);
/**
abstract TransactionRequest<?> commitRequest(boolean coordinated);
/**
abstract DataTreeSnapshot readOnlyView();
abstract DataTreeSnapshot readOnlyView();
- abstract void applyModifyTransactionRequest(ModifyTransactionRequest request,
+ abstract void applyForwardedModifyTransactionRequest(ModifyTransactionRequest request,
@Nullable Consumer<Response<?, ?>> callback);
abstract void replayModifyTransactionRequest(ModifyTransactionRequest request,
@Nullable Consumer<Response<?, ?>> callback);
abstract void replayModifyTransactionRequest(ModifyTransactionRequest request,
- final void doAbort() {
- sendAbort(new AbortLocalTransactionRequest(identifier, localActor()), response -> {
- LOG.debug("Transaction {} abort completed with {}", identifier, response);
- });
+ final AbortLocalTransactionRequest abortRequest() {
+ return new AbortLocalTransactionRequest(identifier, localActor());
} else if (handleReadRequest(request, callback)) {
// No-op
} else if (request instanceof TransactionPurgeRequest) {
} else if (handleReadRequest(request, callback)) {
// No-op
} else if (request instanceof TransactionPurgeRequest) {
- enqueuePurge(enqueuedTicks);
+ enqueuePurge(callback, enqueuedTicks);
} else if (request instanceof IncrementTransactionSequenceRequest) {
// Local transactions do not have non-replayable requests which would be visible to the backend,
// hence we can skip sequence increments.
} else if (request instanceof IncrementTransactionSequenceRequest) {
// Local transactions do not have non-replayable requests which would be visible to the backend,
// hence we can skip sequence increments.
*/
void handleForwardedRemoteRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
if (request instanceof ModifyTransactionRequest) {
*/
void handleForwardedRemoteRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
if (request instanceof ModifyTransactionRequest) {
- applyModifyTransactionRequest((ModifyTransactionRequest) request, callback);
+ applyForwardedModifyTransactionRequest((ModifyTransactionRequest) request, callback);
} else if (handleReadRequest(request, callback)) {
// No-op
} else if (request instanceof TransactionPurgeRequest) {
} else if (handleReadRequest(request, callback)) {
// No-op
} else if (request instanceof TransactionPurgeRequest) {
} else {
throw new IllegalArgumentException("Unhandled request " + request);
}
} else {
throw new IllegalArgumentException("Unhandled request " + request);
}
successor.abort();
} else if (request instanceof TransactionPurgeRequest) {
LOG.debug("Forwarding purge {} to successor {}", request, successor);
successor.abort();
} else if (request instanceof TransactionPurgeRequest) {
LOG.debug("Forwarding purge {} to successor {}", request, successor);
+ successor.sendPurge(callback);
} else {
throw new IllegalArgumentException("Unhandled request" + request);
}
} else {
throw new IllegalArgumentException("Unhandled request" + request);
}
if (request instanceof AbortLocalTransactionRequest) {
successor.sendAbort(request, callback);
} else if (request instanceof TransactionPurgeRequest) {
if (request instanceof AbortLocalTransactionRequest) {
successor.sendAbort(request, callback);
} else if (request instanceof TransactionPurgeRequest) {
+ successor.sendPurge(callback);
} else {
throw new IllegalArgumentException("Unhandled request" + request);
}
} else {
throw new IllegalArgumentException("Unhandled request" + request);
}
- void applyModifyTransactionRequest(final ModifyTransactionRequest request,
+ void applyForwardedModifyTransactionRequest(final ModifyTransactionRequest request,
final Consumer<Response<?, ?>> callback) {
final Consumer<Response<?, ?>> callback) {
- commonModifyTransactionRequest(request, callback);
+ commonModifyTransactionRequest(request);
abort();
}
@Override
void replayModifyTransactionRequest(final ModifyTransactionRequest request,
final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
abort();
}
@Override
void replayModifyTransactionRequest(final ModifyTransactionRequest request,
final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
- commonModifyTransactionRequest(request, callback);
- // FIXME: this should go through the enqueueRequest() path
- abort();
+ commonModifyTransactionRequest(request);
+ enqueueAbort(callback, enqueuedTicks);
- private static void commonModifyTransactionRequest(final ModifyTransactionRequest request,
- final Consumer<Response<?, ?>> callback) {
+ private static void commonModifyTransactionRequest(final ModifyTransactionRequest request) {
Verify.verify(request.getModifications().isEmpty());
final PersistenceProtocol protocol = request.getPersistenceProtocol().get();
Verify.verify(request.getModifications().isEmpty());
final PersistenceProtocol protocol = request.getPersistenceProtocol().get();
- void applyModifyTransactionRequest(final ModifyTransactionRequest request,
+ void applyForwardedModifyTransactionRequest(final ModifyTransactionRequest request,
final @Nullable Consumer<Response<?, ?>> callback) {
commonModifyTransactionRequest(request, callback, this::sendRequest);
}
final @Nullable Consumer<Response<?, ?>> callback) {
commonModifyTransactionRequest(request, callback, this::sendRequest);
}
enqueueRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback,
enqueuedTicks);
} else if (request instanceof TransactionAbortRequest) {
enqueueRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback,
enqueuedTicks);
} else if (request instanceof TransactionAbortRequest) {
- enqueueAbort(callback, enqueuedTicks);
+ enqueueDoAbort(callback, enqueuedTicks);
} else {
super.handleReplayedRemoteRequest(request, callback, enqueuedTicks);
}
} else {
super.handleReplayedRemoteRequest(request, callback, enqueuedTicks);
}
} else if (request instanceof TransactionDoCommitRequest) {
sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback);
} else if (request instanceof TransactionAbortRequest) {
} else if (request instanceof TransactionDoCommitRequest) {
sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback);
} else if (request instanceof TransactionAbortRequest) {
} else {
super.handleForwardedRemoteRequest(request, callback);
}
} else {
super.handleForwardedRemoteRequest(request, callback);
}
final void abortTransaction(final AbstractProxyTransaction tx) {
lock.lock();
try {
final void abortTransaction(final AbstractProxyTransaction tx) {
lock.lock();
try {
- proxies.remove(tx.getIdentifier());
- LOG.debug("Proxy {} aborting transaction {}", this, tx);
+ // Removal will be completed once purge completes
+ LOG.debug("Proxy {} aborted transaction {}", this, tx);
onTransactionAborted(tx);
} finally {
lock.unlock();
onTransactionAborted(tx);
} finally {
lock.unlock();
isSnapshotOnly()), t -> completeRead(future, t), future);
}
isSnapshotOnly()), t -> completeRead(future, t), future);
}
- @Override
- void doAbort() {
- ensureInitializedBuilder();
- builder.setAbort();
- flushBuilder();
- }
-
private void ensureInitializedBuilder() {
if (!builderBusy) {
builder.setSequence(nextSequence());
private void ensureInitializedBuilder() {
if (!builderBusy) {
builder.setSequence(nextSequence());
recordFinishedRequest(response);
}
recordFinishedRequest(response);
}
- private ModifyTransactionRequest abortRequest() {
+ @Override
+ ModifyTransactionRequest abortRequest() {
ensureInitializedBuilder();
builder.setAbort();
final ModifyTransactionRequest ret = builder.build();
ensureInitializedBuilder();
builder.setAbort();
final ModifyTransactionRequest ret = builder.build();
sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback);
} else if (request instanceof TransactionAbortRequest) {
ensureFlushedBuider();
sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback);
} else if (request instanceof TransactionAbortRequest) {
ensureFlushedBuider();
} else if (request instanceof TransactionPurgeRequest) {
} else if (request instanceof TransactionPurgeRequest) {
} else {
throw new IllegalArgumentException("Unhandled request {}" + request);
}
} else {
throw new IllegalArgumentException("Unhandled request {}" + request);
}
enqueuedTicks);
} else if (request instanceof TransactionAbortRequest) {
ensureFlushedBuider(optTicks);
enqueuedTicks);
} else if (request instanceof TransactionAbortRequest) {
ensureFlushedBuider(optTicks);
- enqueueAbort(callback, enqueuedTicks);
+ enqueueDoAbort(callback, enqueuedTicks);
} else if (request instanceof TransactionPurgeRequest) {
} else if (request instanceof TransactionPurgeRequest) {
- enqueuePurge(enqueuedTicks);
+ enqueuePurge(callback, enqueuedTicks);
} else if (request instanceof IncrementTransactionSequenceRequest) {
final IncrementTransactionSequenceRequest req = (IncrementTransactionSequenceRequest) request;
ensureFlushedBuider(optTicks);
} else if (request instanceof IncrementTransactionSequenceRequest) {
final IncrementTransactionSequenceRequest req = (IncrementTransactionSequenceRequest) request;
ensureFlushedBuider(optTicks);
- public void testDoAbort() throws Exception {
- transaction.doAbort();
+ public void testAbort() throws Exception {
+ transaction.abort();
getTester().expectTransactionRequest(AbortLocalTransactionRequest.class);
}
getTester().expectTransactionRequest(AbortLocalTransactionRequest.class);
}