- private void handleTransactionPreCommit(final TransactionPreCommitRequest request,
- final RequestEnvelope envelope, final long now) throws RequestException {
- readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
- @Override
- public void onSuccess(final DataTreeCandidate result) {
- recordAndSendSuccess(envelope, now, new TransactionPreCommitSuccess(readyCohort.getIdentifier(),
- request.getSequence()));
- }
-
- @Override
- public void onFailure(final Throwable failure) {
- recordAndSendFailure(envelope, now, new RuntimeRequestException("Precommit failed", failure));
- readyCohort = null;
- }
- });
- }
-
- private void handleTransactionDoCommit(final TransactionDoCommitRequest request, final RequestEnvelope envelope,
- final long now) throws RequestException {
- readyCohort.commit(new FutureCallback<UnsignedLong>() {
- @Override
- public void onSuccess(final UnsignedLong result) {
- successfulCommit(envelope, now);
- }
-
- @Override
- public void onFailure(final Throwable failure) {
- recordAndSendFailure(envelope, now, new RuntimeRequestException("Commit failed", failure));
- readyCohort = null;
- }
- });
- }
-
- private TransactionSuccess<?> handleTransactionAbort(final TransactionAbortRequest request,
- final RequestEnvelope envelope, final long now) throws RequestException {
- if (readyCohort == null) {
- openTransaction.abort();
- return new TransactionAbortSuccess(id, request.getSequence());
- }
-
- readyCohort.abort(new FutureCallback<Void>() {
- @Override
- public void onSuccess(final Void result) {
- readyCohort = null;
- recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(id, request.getSequence()));
- LOG.debug("Transaction {} aborted", id);
- }
-
- @Override
- public void onFailure(final Throwable failure) {
- readyCohort = null;
- LOG.warn("Transaction {} abort failed", id, failure);
- recordAndSendFailure(envelope, now, new RuntimeRequestException("Abort failed", failure));
- }
- });
- return null;
- }
-
- private void coordinatedCommit(final RequestEnvelope envelope, final long now) {
- readyCohort.canCommit(new FutureCallback<Void>() {
- @Override
- public void onSuccess(final Void result) {
- recordAndSendSuccess(envelope, now, new TransactionCanCommitSuccess(readyCohort.getIdentifier(),
- envelope.getMessage().getSequence()));
- }
-
- @Override
- public void onFailure(final Throwable failure) {
- recordAndSendFailure(envelope, now, new RuntimeRequestException("CanCommit failed", failure));
- readyCohort = null;
- }
- });
- }
-
- private void directCommit(final RequestEnvelope envelope, final long now) {
- readyCohort.canCommit(new FutureCallback<Void>() {
- @Override
- public void onSuccess(final Void result) {
- successfulDirectCanCommit(envelope, now);
- }
-
- @Override
- public void onFailure(final Throwable failure) {
- recordAndSendFailure(envelope, now, new RuntimeRequestException("CanCommit failed", failure));
- readyCohort = null;
- }
- });
-
- }
-
- private void successfulDirectCanCommit(final RequestEnvelope envelope, final long startTime) {
- readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
- @Override
- public void onSuccess(final DataTreeCandidate result) {
- successfulDirectPreCommit(envelope, startTime);
- }
-
- @Override
- public void onFailure(final Throwable failure) {
- recordAndSendFailure(envelope, startTime, new RuntimeRequestException("PreCommit failed", failure));
- readyCohort = null;
- }
- });
- }
-
- private void successfulDirectPreCommit(final RequestEnvelope envelope, final long startTime) {
- readyCohort.commit(new FutureCallback<UnsignedLong>() {
-
- @Override
- public void onSuccess(final UnsignedLong result) {
- successfulCommit(envelope, startTime);
- }
-
- @Override
- public void onFailure(final Throwable failure) {
- recordAndSendFailure(envelope, startTime, new RuntimeRequestException("DoCommit failed", failure));
- readyCohort = null;
- }
- });
- }
-
- private void successfulCommit(final RequestEnvelope envelope, final long startTime) {
- recordAndSendSuccess(envelope, startTime, new TransactionCommitSuccess(readyCohort.getIdentifier(),
- envelope.getMessage().getSequence()));
- readyCohort = null;
- }
-
- private void handleCommitLocalTransaction(final CommitLocalTransactionRequest request,
- final RequestEnvelope envelope, final long now) throws RequestException {
- if (sealedModification.equals(request.getModification())) {
- readyCohort = history.createReadyCohort(id, sealedModification);
-
- if (request.isCoordinated()) {
- coordinatedCommit(envelope, now);
- } else {
- directCommit(envelope, now);
- }
- } else {
- throw new UnsupportedRequestException(request);
- }
- }
-
- private ExistsTransactionSuccess handleExistsTransaction(final ExistsTransactionRequest request)
- throws RequestException {
- final Optional<NormalizedNode<?, ?>> data = openTransaction.getSnapshot().readNode(request.getPath());
- return recordSuccess(request.getSequence(), new ExistsTransactionSuccess(id, request.getSequence(),
- data.isPresent()));
- }
-
- private ReadTransactionSuccess handleReadTransaction(final ReadTransactionRequest request)
- throws RequestException {
- final Optional<NormalizedNode<?, ?>> data = openTransaction.getSnapshot().readNode(request.getPath());
- return recordSuccess(request.getSequence(), new ReadTransactionSuccess(id, request.getSequence(), data));
- }
-
- private ModifyTransactionSuccess replyModifySuccess(final long sequence) {
- if (cachedModifySuccess == null) {
- cachedModifySuccess = new ModifyTransactionSuccess(id, sequence);
- }
-
- return recordSuccess(sequence, cachedModifySuccess);