- @Override
- CommitLocalTransactionRequest commitRequest(final boolean coordinated) {
- final CommitLocalTransactionRequest ret = new CommitLocalTransactionRequest(identifier, localActor(),
- modification, coordinated);
- modification = new FailedDataTreeModification(this::submittedException);
- return ret;
- }
-
- @Override
- void doSeal() {
- modification.ready();
- }
-
- DataTreeSnapshot getSnapshot() {
- return modification;
- }
-
- private void applyModifyTransactionRequest(final ModifyTransactionRequest request,
- final @Nullable Consumer<Response<?, ?>> callback) {
- for (TransactionModification mod : request.getModifications()) {
- if (mod instanceof TransactionWrite) {
- modification.write(mod.getPath(), ((TransactionWrite)mod).getData());
- } else if (mod instanceof TransactionMerge) {
- modification.merge(mod.getPath(), ((TransactionMerge)mod).getData());
- } else if (mod instanceof TransactionDelete) {
- modification.delete(mod.getPath());
- } else {
- throw new IllegalArgumentException("Unsupported modification " + mod);
- }
- }
-
- final java.util.Optional<PersistenceProtocol> maybeProtocol = request.getPersistenceProtocol();
- if (maybeProtocol.isPresent()) {
- seal();
- Verify.verify(callback != null, "Request {} has null callback", request);
-
- switch (maybeProtocol.get()) {
- case ABORT:
- sendAbort(callback);
- break;
- case SIMPLE:
- sendRequest(commitRequest(false), callback);
- break;
- case THREE_PHASE:
- sendRequest(commitRequest(true), callback);
- break;
- default:
- throw new IllegalArgumentException("Unhandled protocol " + maybeProtocol.get());
- }
- }
- }
-