}
// Sequence has already been checked
- @Nullable TransactionSuccess<?> handleRequest(final TransactionRequest<?> request, final RequestEnvelope envelope)
- throws RequestException {
+ @Nullable TransactionSuccess<?> handleRequest(final TransactionRequest<?> request, final RequestEnvelope envelope,
+ final long now) throws RequestException {
if (request instanceof ModifyTransactionRequest) {
- return handleModifyTransaction((ModifyTransactionRequest) request, envelope);
+ return handleModifyTransaction((ModifyTransactionRequest) request, envelope, now);
} else if (request instanceof CommitLocalTransactionRequest) {
- handleCommitLocalTransaction((CommitLocalTransactionRequest) request, envelope);
+ handleCommitLocalTransaction((CommitLocalTransactionRequest) request, envelope, now);
return null;
} else if (request instanceof ExistsTransactionRequest) {
return handleExistsTransaction((ExistsTransactionRequest) request);
} else if (request instanceof ReadTransactionRequest) {
return handleReadTransaction((ReadTransactionRequest) request);
} else if (request instanceof TransactionPreCommitRequest) {
- handleTransactionPreCommit((TransactionPreCommitRequest) request, envelope);
+ handleTransactionPreCommit((TransactionPreCommitRequest) request, envelope, now);
return null;
} else if (request instanceof TransactionDoCommitRequest) {
- handleTransactionDoCommit((TransactionDoCommitRequest) request, envelope);
+ handleTransactionDoCommit((TransactionDoCommitRequest) request, envelope, now);
return null;
} else if (request instanceof TransactionAbortRequest) {
- handleTransactionAbort((TransactionAbortRequest) request, envelope);
+ handleTransactionAbort((TransactionAbortRequest) request, envelope, now);
return null;
} else {
throw new UnsupportedRequestException(request);
return success;
}
- private void recordAndSendSuccess(final RequestEnvelope envelope, final TransactionSuccess<?> success) {
+ private long executionTime(final long startTime) {
+ return history.readTime() - startTime;
+ }
+
+ private void recordAndSendSuccess(final RequestEnvelope envelope, final long startTime,
+ final TransactionSuccess<?> success) {
recordResponse(success.getSequence(), success);
- envelope.sendSuccess(success);
+ envelope.sendSuccess(success, executionTime(startTime));
}
- private void recordAndSendFailure(final RequestEnvelope envelope, final RuntimeRequestException failure) {
+ private void recordAndSendFailure(final RequestEnvelope envelope, final long startTime,
+ final RuntimeRequestException failure) {
recordResponse(envelope.getMessage().getSequence(), failure);
- envelope.sendFailure(failure);
+ envelope.sendFailure(failure, executionTime(startTime));
}
private void handleTransactionPreCommit(final TransactionPreCommitRequest request,
- final RequestEnvelope envelope) throws RequestException {
+ final RequestEnvelope envelope, final long now) throws RequestException {
readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
@Override
public void onSuccess(final DataTreeCandidate result) {
- recordAndSendSuccess(envelope, new TransactionPreCommitSuccess(readyCohort.getIdentifier(),
+ recordAndSendSuccess(envelope, now, new TransactionPreCommitSuccess(readyCohort.getIdentifier(),
request.getSequence()));
}
@Override
public void onFailure(final Throwable failure) {
- recordAndSendFailure(envelope, new RuntimeRequestException("Precommit failed", failure));
+ recordAndSendFailure(envelope, now, new RuntimeRequestException("Precommit failed", failure));
readyCohort = null;
}
});
}
- private void handleTransactionDoCommit(final TransactionDoCommitRequest request, final RequestEnvelope envelope)
- throws RequestException {
+ private void handleTransactionDoCommit(final TransactionDoCommitRequest request, final RequestEnvelope envelope,
+ final long now) throws RequestException {
readyCohort.commit(new FutureCallback<UnsignedLong>() {
@Override
public void onSuccess(final UnsignedLong result) {
- successfulCommit(envelope);
+ successfulCommit(envelope, now);
}
@Override
public void onFailure(final Throwable failure) {
- recordAndSendFailure(envelope, new RuntimeRequestException("Commit failed", failure));
+ recordAndSendFailure(envelope, now, new RuntimeRequestException("Commit failed", failure));
readyCohort = null;
}
});
}
- private void handleTransactionAbort(final TransactionAbortRequest request, final RequestEnvelope envelope)
- throws RequestException {
+ private void handleTransactionAbort(final TransactionAbortRequest request, final RequestEnvelope envelope,
+ final long now) throws RequestException {
readyCohort.abort(new FutureCallback<Void>() {
@Override
public void onSuccess(final Void result) {
readyCohort = null;
- recordAndSendSuccess(envelope, new TransactionAbortSuccess(id, request.getSequence()));
+ recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(id, request.getSequence()));
LOG.debug("Transaction {} aborted", id);
}
public void onFailure(final Throwable failure) {
readyCohort = null;
LOG.warn("Transaction {} abort failed", id, failure);
- recordAndSendFailure(envelope, new RuntimeRequestException("Abort failed", failure));
+ recordAndSendFailure(envelope, now, new RuntimeRequestException("Abort failed", failure));
}
});
}
- private void coordinatedCommit(final RequestEnvelope envelope) {
+ private void coordinatedCommit(final RequestEnvelope envelope, final long now) {
readyCohort.canCommit(new FutureCallback<Void>() {
@Override
public void onSuccess(final Void result) {
- recordAndSendSuccess(envelope, new TransactionCanCommitSuccess(readyCohort.getIdentifier(),
+ recordAndSendSuccess(envelope, now, new TransactionCanCommitSuccess(readyCohort.getIdentifier(),
envelope.getMessage().getSequence()));
}
@Override
public void onFailure(final Throwable failure) {
- recordAndSendFailure(envelope, new RuntimeRequestException("CanCommit failed", failure));
+ recordAndSendFailure(envelope, now, new RuntimeRequestException("CanCommit failed", failure));
readyCohort = null;
}
});
}
- private void directCommit(final RequestEnvelope envelope) {
+ private void directCommit(final RequestEnvelope envelope, final long now) {
readyCohort.canCommit(new FutureCallback<Void>() {
@Override
public void onSuccess(final Void result) {
- successfulDirectCanCommit(envelope);
+ successfulDirectCanCommit(envelope, now);
}
@Override
public void onFailure(final Throwable failure) {
- recordAndSendFailure(envelope, new RuntimeRequestException("CanCommit failed", failure));
+ recordAndSendFailure(envelope, now, new RuntimeRequestException("CanCommit failed", failure));
readyCohort = null;
}
});
}
- private void successfulDirectCanCommit(final RequestEnvelope envelope) {
+ private void successfulDirectCanCommit(final RequestEnvelope envelope, final long startTime) {
readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
@Override
public void onSuccess(final DataTreeCandidate result) {
- successfulDirectPreCommit(envelope);
+ successfulDirectPreCommit(envelope, startTime);
}
@Override
public void onFailure(final Throwable failure) {
- recordAndSendFailure(envelope, new RuntimeRequestException("PreCommit failed", failure));
+ recordAndSendFailure(envelope, startTime, new RuntimeRequestException("PreCommit failed", failure));
readyCohort = null;
}
});
}
- private void successfulDirectPreCommit(final RequestEnvelope envelope) {
+ private void successfulDirectPreCommit(final RequestEnvelope envelope, final long startTime) {
readyCohort.commit(new FutureCallback<UnsignedLong>() {
@Override
public void onSuccess(final UnsignedLong result) {
- successfulCommit(envelope);
+ successfulCommit(envelope, startTime);
}
@Override
public void onFailure(final Throwable failure) {
- recordAndSendFailure(envelope, new RuntimeRequestException("DoCommit failed", failure));
+ recordAndSendFailure(envelope, startTime, new RuntimeRequestException("DoCommit failed", failure));
readyCohort = null;
}
});
}
- private void successfulCommit(final RequestEnvelope envelope) {
- recordAndSendSuccess(envelope, new TransactionCommitSuccess(readyCohort.getIdentifier(),
+ private void successfulCommit(final RequestEnvelope envelope, final long startTime) {
+ recordAndSendSuccess(envelope, startTime, new TransactionCommitSuccess(readyCohort.getIdentifier(),
envelope.getMessage().getSequence()));
readyCohort = null;
}
private void handleCommitLocalTransaction(final CommitLocalTransactionRequest request,
- final RequestEnvelope envelope) throws RequestException {
+ final RequestEnvelope envelope, final long now) throws RequestException {
if (sealedModification.equals(request.getModification())) {
readyCohort = history.createReadyCohort(id, sealedModification);
if (request.isCoordinated()) {
- coordinatedCommit(envelope);
+ coordinatedCommit(envelope, now);
} else {
- directCommit(envelope);
+ directCommit(envelope, now);
}
} else {
throw new UnsupportedRequestException(request);
data.isPresent()));
}
- private ReadTransactionSuccess handleReadTransaction(final ReadTransactionRequest request) throws RequestException {
+ private ReadTransactionSuccess handleReadTransaction(final ReadTransactionRequest request)
+ throws RequestException {
final Optional<NormalizedNode<?, ?>> data = openTransaction.getSnapshot().readNode(request.getPath());
return recordSuccess(request.getSequence(), new ReadTransactionSuccess(id, request.getSequence(), data));
}
}
private @Nullable TransactionSuccess<?> handleModifyTransaction(final ModifyTransactionRequest request,
- final RequestEnvelope envelope) throws RequestException {
+ final RequestEnvelope envelope, final long now) throws RequestException {
final DataTreeModification modification = openTransaction.getSnapshot();
for (TransactionModification m : request.getModifications()) {
case SIMPLE:
readyCohort = openTransaction.ready();
openTransaction = null;
- directCommit(envelope);
+ directCommit(envelope, now);
return null;
case THREE_PHASE:
readyCohort = openTransaction.ready();
openTransaction = null;
- coordinatedCommit(envelope);
+ coordinatedCommit(envelope, now);
return null;
default:
throw new UnsupportedRequestException(request);