+ if (sendReadyOnSeal) {
+ ensureInitializedBuilder();
+ builder.setReady();
+ flushBuilder();
+ }
+ }
+
+ @Override
+ void flushState(final AbstractProxyTransaction successor) {
+ if (builderBusy) {
+ final ModifyTransactionRequest request = builder.build();
+ builderBusy = false;
+ successor.handleForwardedRemoteRequest(request, null);
+ }
+ }
+
+ @Override
+ void forwardToRemote(final RemoteProxyTransaction successor, final TransactionRequest<?> request,
+ final Consumer<Response<?, ?>> callback) {
+ successor.handleForwardedRequest(request, callback);
+ }
+
+ private void handleForwardedRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
+ if (request instanceof ModifyTransactionRequest) {
+ final ModifyTransactionRequest req = (ModifyTransactionRequest) request;
+
+ req.getModifications().forEach(this::appendModification);
+
+ final java.util.Optional<PersistenceProtocol> maybeProto = req.getPersistenceProtocol();
+ if (maybeProto.isPresent()) {
+ ensureSealed();
+
+ switch (maybeProto.get()) {
+ case ABORT:
+ sendAbort(callback);
+ break;
+ case SIMPLE:
+ sendRequest(commitRequest(false), callback);
+ break;
+ case THREE_PHASE:
+ sendRequest(commitRequest(true), callback);
+ break;
+ default:
+ throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get());
+ }
+ }
+ } else if (request instanceof ReadTransactionRequest) {
+ ensureFlushedBuider();
+ sendRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(),
+ ((ReadTransactionRequest) request).getPath(), isSnapshotOnly()), callback);
+ } else if (request instanceof ExistsTransactionRequest) {
+ ensureFlushedBuider();
+ sendRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(),
+ ((ExistsTransactionRequest) request).getPath(), isSnapshotOnly()), callback);
+ } else if (request instanceof TransactionPreCommitRequest) {
+ ensureFlushedBuider();
+ sendRequest(new TransactionPreCommitRequest(getIdentifier(), nextSequence(), localActor()), callback);
+ } else if (request instanceof TransactionDoCommitRequest) {
+ ensureFlushedBuider();
+ sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback);
+ } else if (request instanceof TransactionAbortRequest) {
+ ensureFlushedBuider();
+ sendAbort(callback);
+ } else {
+ throw new IllegalArgumentException("Unhandled request {}" + request);
+ }
+ }
+
+ @Override
+ void forwardToLocal(final LocalProxyTransaction successor, final TransactionRequest<?> request,
+ final Consumer<Response<?, ?>> callback) {
+ successor.handleForwardedRemoteRequest(request, callback);