+
+ @Override
+ void replaySuccessfulRequests(final AbstractProxyTransaction successor) {
+ super.replaySuccessfulRequests(successor);
+
+ for (TransactionRequest<?> req : successfulRequests) {
+ LOG.debug("Forwarding request {} to successor {}", req, successor);
+ successor.handleForwardedRemoteRequest(req, null);
+ }
+ successfulRequests.clear();
+ }
+
+ @Override
+ void forwardToRemote(final RemoteProxyTransaction successor, final TransactionRequest<?> request,
+ final Consumer<Response<?, ?>> callback) throws RequestException {
+ successor.handleForwardedRequest(request, callback);
+ }
+
+ private void handleForwardedRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback)
+ throws RequestException {
+ if (request instanceof ModifyTransactionRequest) {
+ final ModifyTransactionRequest req = (ModifyTransactionRequest) request;
+
+ req.getModifications().forEach(this::appendModification);
+
+ final java.util.Optional<PersistenceProtocol> maybeProto = req.getPersistenceProtocol();
+ if (maybeProto.isPresent()) {
+ seal();
+
+ switch (maybeProto.get()) {
+ case ABORT:
+ sendAbort(callback);
+ break;
+ case SIMPLE:
+ sendRequest(commitRequest(false), callback);
+ break;
+ case THREE_PHASE:
+ sendRequest(commitRequest(true), callback);
+ break;
+ default:
+ throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get());
+ }
+ }
+ } else {
+ throw new IllegalArgumentException("Unhandled request {}" + request);
+ }
+ }
+
+ @Override
+ void forwardToLocal(final LocalProxyTransaction successor, final TransactionRequest<?> request,
+ final Consumer<Response<?, ?>> callback) throws RequestException {
+ successor.handleForwardedRemoteRequest(request, callback);
+ }