- private void handleTransactionPreCommit(final TransactionPreCommitRequest request,
- final RequestEnvelope envelope, final long now) throws RequestException {
- readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
- @Override
- public void onSuccess(final DataTreeCandidate result) {
- recordAndSendSuccess(envelope, now, new TransactionPreCommitSuccess(readyCohort.getIdentifier(),
- request.getSequence()));
- }
-
- @Override
- public void onFailure(final Throwable failure) {
- recordAndSendFailure(envelope, now, new RuntimeRequestException("Precommit failed", failure));
- readyCohort = null;
- }
- });
- }
-
- private void handleTransactionDoCommit(final TransactionDoCommitRequest request, final RequestEnvelope envelope,
- final long now) throws RequestException {
- readyCohort.commit(new FutureCallback<UnsignedLong>() {
- @Override
- public void onSuccess(final UnsignedLong result) {
- successfulCommit(envelope, now);
- }
-
- @Override
- public void onFailure(final Throwable failure) {
- recordAndSendFailure(envelope, now, new RuntimeRequestException("Commit failed", failure));
- readyCohort = null;
- }
- });
- }
-
- private TransactionSuccess<?> handleTransactionAbort(final TransactionAbortRequest request,
- final RequestEnvelope envelope, final long now) throws RequestException {
- if (readyCohort == null) {
- openTransaction.abort();
- return new TransactionAbortSuccess(id, request.getSequence());
- }
-
- readyCohort.abort(new FutureCallback<Void>() {
- @Override
- public void onSuccess(final Void result) {
- readyCohort = null;
- recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(id, request.getSequence()));
- LOG.debug("Transaction {} aborted", id);
- }
-
- @Override
- public void onFailure(final Throwable failure) {
- readyCohort = null;
- LOG.warn("Transaction {} abort failed", id, failure);
- recordAndSendFailure(envelope, now, new RuntimeRequestException("Abort failed", failure));
- }
- });
- return null;
- }
-
- private void coordinatedCommit(final RequestEnvelope envelope, final long now) {
- readyCohort.canCommit(new FutureCallback<Void>() {
- @Override
- public void onSuccess(final Void result) {
- recordAndSendSuccess(envelope, now, new TransactionCanCommitSuccess(readyCohort.getIdentifier(),
- envelope.getMessage().getSequence()));
- }
-
- @Override
- public void onFailure(final Throwable failure) {
- recordAndSendFailure(envelope, now, new RuntimeRequestException("CanCommit failed", failure));
- readyCohort = null;
- }
- });
- }
-
- private void directCommit(final RequestEnvelope envelope, final long now) {
- readyCohort.canCommit(new FutureCallback<Void>() {
- @Override
- public void onSuccess(final Void result) {
- successfulDirectCanCommit(envelope, now);
- }
-
- @Override
- public void onFailure(final Throwable failure) {
- recordAndSendFailure(envelope, now, new RuntimeRequestException("CanCommit failed", failure));
- readyCohort = null;
- }
- });
-
- }
-
- private void successfulDirectCanCommit(final RequestEnvelope envelope, final long startTime) {
- readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
- @Override
- public void onSuccess(final DataTreeCandidate result) {
- successfulDirectPreCommit(envelope, startTime);
- }
-
- @Override
- public void onFailure(final Throwable failure) {
- recordAndSendFailure(envelope, startTime, new RuntimeRequestException("PreCommit failed", failure));
- readyCohort = null;
- }
- });
- }
-
- private void successfulDirectPreCommit(final RequestEnvelope envelope, final long startTime) {
- readyCohort.commit(new FutureCallback<UnsignedLong>() {
-
- @Override
- public void onSuccess(final UnsignedLong result) {
- successfulCommit(envelope, startTime);
- }
-
- @Override
- public void onFailure(final Throwable failure) {
- recordAndSendFailure(envelope, startTime, new RuntimeRequestException("DoCommit failed", failure));
- readyCohort = null;
- }
- });
- }
-
- private void successfulCommit(final RequestEnvelope envelope, final long startTime) {
- recordAndSendSuccess(envelope, startTime, new TransactionCommitSuccess(readyCohort.getIdentifier(),
- envelope.getMessage().getSequence()));
- readyCohort = null;
- }
-
- private void handleCommitLocalTransaction(final CommitLocalTransactionRequest request,
- final RequestEnvelope envelope, final long now) throws RequestException {
- if (sealedModification.equals(request.getModification())) {
- readyCohort = history.createReadyCohort(id, sealedModification);
-
- if (request.isCoordinated()) {
- coordinatedCommit(envelope, now);
- } else {
- directCommit(envelope, now);
- }
- } else {
- throw new UnsupportedRequestException(request);
- }
- }
-
- private ExistsTransactionSuccess handleExistsTransaction(final ExistsTransactionRequest request)
- throws RequestException {
- final Optional<NormalizedNode<?, ?>> data = openTransaction.getSnapshot().readNode(request.getPath());
- return recordSuccess(request.getSequence(), new ExistsTransactionSuccess(id, request.getSequence(),
- data.isPresent()));
- }
-
- private ReadTransactionSuccess handleReadTransaction(final ReadTransactionRequest request)
- throws RequestException {
- final Optional<NormalizedNode<?, ?>> data = openTransaction.getSnapshot().readNode(request.getPath());
- return recordSuccess(request.getSequence(), new ReadTransactionSuccess(id, request.getSequence(), data));
- }
-
- private ModifyTransactionSuccess replyModifySuccess(final long sequence) {
- return recordSuccess(sequence, new ModifyTransactionSuccess(id, sequence));
- }
-
- private @Nullable TransactionSuccess<?> handleModifyTransaction(final ModifyTransactionRequest request,
- final RequestEnvelope envelope, final long now) throws RequestException {
-
- final DataTreeModification modification = openTransaction.getSnapshot();
- for (TransactionModification m : request.getModifications()) {
- if (m instanceof TransactionDelete) {
- modification.delete(m.getPath());
- } else if (m instanceof TransactionWrite) {
- modification.write(m.getPath(), ((TransactionWrite) m).getData());
- } else if (m instanceof TransactionMerge) {
- modification.merge(m.getPath(), ((TransactionMerge) m).getData());
- } else {
- LOG.warn("{}: ignoring unhandled modification {}", history.persistenceId(), m);
- }
- }
-
- final java.util.Optional<PersistenceProtocol> maybeProto = request.getPersistenceProtocol();
- if (!maybeProto.isPresent()) {
- return replyModifySuccess(request.getSequence());
- }
-
- switch (maybeProto.get()) {
- case ABORT:
- openTransaction.abort();
- openTransaction = null;
- return replyModifySuccess(request.getSequence());
- case SIMPLE:
- readyCohort = openTransaction.ready();
- openTransaction = null;
- directCommit(envelope, now);
- return null;
- case THREE_PHASE:
- readyCohort = openTransaction.ready();
- openTransaction = null;
- coordinatedCommit(envelope, now);
- return null;
- default:
- throw new UnsupportedRequestException(request);
- }