+
+ DataTreeSnapshot getSnapshot() {
+ return modification;
+ }
+
+ private void applyModifyTransactionRequest(final ModifyTransactionRequest request,
+ final @Nullable Consumer<Response<?, ?>> callback) {
+ for (TransactionModification mod : request.getModifications()) {
+ if (mod instanceof TransactionWrite) {
+ modification.write(mod.getPath(), ((TransactionWrite)mod).getData());
+ } else if (mod instanceof TransactionMerge) {
+ modification.merge(mod.getPath(), ((TransactionMerge)mod).getData());
+ } else if (mod instanceof TransactionDelete) {
+ modification.delete(mod.getPath());
+ } else {
+ throw new IllegalArgumentException("Unsupported modification " + mod);
+ }
+ }
+
+ final java.util.Optional<PersistenceProtocol> maybeProtocol = request.getPersistenceProtocol();
+ if (maybeProtocol.isPresent()) {
+ seal();
+ Verify.verify(callback != null, "Request {} has null callback", request);
+
+ switch (maybeProtocol.get()) {
+ case ABORT:
+ sendAbort(callback);
+ break;
+ case SIMPLE:
+ sendRequest(commitRequest(false), callback);
+ break;
+ case THREE_PHASE:
+ sendRequest(commitRequest(true), callback);
+ break;
+ default:
+ throw new IllegalArgumentException("Unhandled protocol " + maybeProtocol.get());
+ }
+ }
+ }
+
+ @Override
+ void handleForwardedRemoteRequest(final TransactionRequest<?> request,
+ final @Nullable Consumer<Response<?, ?>> callback) {
+ LOG.debug("Applying forwaded request {}", request);
+
+ if (request instanceof ModifyTransactionRequest) {
+ applyModifyTransactionRequest((ModifyTransactionRequest) request, callback);
+ } else {
+ throw new IllegalArgumentException("Unhandled request " + request);
+ }
+ }
+
+ @Override
+ void forwardToRemote(final RemoteProxyTransaction successor, final TransactionRequest<?> request,
+ final Consumer<Response<?, ?>> callback) throws RequestException {
+ if (request instanceof CommitLocalTransactionRequest) {
+ final CommitLocalTransactionRequest req = (CommitLocalTransactionRequest) request;
+ final DataTreeModification mod = req.getModification();
+
+ LOG.debug("Applying modification {} to successor {}", mod, successor);
+ mod.applyToCursor(new AbstractDataTreeModificationCursor() {
+ @Override
+ public void write(final PathArgument child, final NormalizedNode<?, ?> data) {
+ successor.write(current().node(child), data);
+ }
+
+ @Override
+ public void merge(final PathArgument child, final NormalizedNode<?, ?> data) {
+ successor.merge(current().node(child), data);
+ }
+
+ @Override
+ public void delete(final PathArgument child) {
+ successor.delete(current().node(child));
+ }
+ });
+
+ successor.seal();
+
+ final ModifyTransactionRequest successorReq = successor.commitRequest(req.isCoordinated());
+ successor.sendRequest(successorReq, callback);
+ } else if (request instanceof AbortLocalTransactionRequest) {
+ LOG.debug("Forwarding abort {} to successor {}", request, successor);
+ successor.abort();
+ } else {
+ throw new IllegalArgumentException("Unhandled request" + request);
+ }
+ }
+
+ @Override
+ void forwardToLocal(final LocalProxyTransaction successor, final TransactionRequest<?> request,
+ final Consumer<Response<?, ?>> callback) throws RequestException {
+ if (request instanceof AbortLocalTransactionRequest) {
+ successor.sendAbort(request, callback);
+ } else if (request instanceof CommitLocalTransactionRequest) {
+ successor.sendCommit((CommitLocalTransactionRequest)request, callback);
+ } else {
+ throw new IllegalArgumentException("Unhandled request" + request);
+ }
+
+ LOG.debug("Forwarded request {} to successor {}", request, successor);
+ }
+
+ private void sendAbort(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
+ sendRequest(request, callback);
+ modification = new FailedDataTreeModification(this::abortedException);
+ }
+
+ private void sendCommit(final CommitLocalTransactionRequest request, final Consumer<Response<?, ?>> callback) {
+ // Rebase old modification on new data tree.
+ try (DataTreeModificationCursor cursor = modification.createCursor(YangInstanceIdentifier.EMPTY)) {
+ request.getModification().applyToCursor(cursor);
+ }
+
+ seal();
+ sendRequest(commitRequest(request.isCoordinated()), callback);
+ }