void handleForwardedRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
if (request instanceof ModifyTransactionRequest) {
- final ModifyTransactionRequest req = (ModifyTransactionRequest) request;
-
- req.getModifications().forEach(this::appendModification);
-
- final java.util.Optional<PersistenceProtocol> maybeProto = req.getPersistenceProtocol();
- if (maybeProto.isPresent()) {
- // Persistence protocol implies we are sealed, propagate the marker, but hold off doing other actions
- // until we know what we are going to do.
- if (markSealed()) {
- sealOnly();
- }
-
- final TransactionRequest<?> tmp;
- switch (maybeProto.get()) {
- case ABORT:
- tmp = abortRequest();
- sendRequest(tmp, resp -> {
- completeModify(tmp, resp);
- callback.accept(resp);
- });
- break;
- case SIMPLE:
- tmp = commitRequest(false);
- sendRequest(tmp, resp -> {
- completeModify(tmp, resp);
- callback.accept(resp);
- });
- break;
- case THREE_PHASE:
- tmp = commitRequest(true);
- sendRequest(tmp, resp -> {
- recordSuccessfulRequest(tmp);
- callback.accept(resp);
- });
- break;
- case READY:
- tmp = readyRequest();
- sendRequest(tmp, resp -> {
- recordSuccessfulRequest(tmp);
- callback.accept(resp);
- });
- break;
- default:
- throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get());
- }
- }
+ handleForwardedModifyTransactionRequest(callback, (ModifyTransactionRequest) request);
} else if (request instanceof ReadTransactionRequest) {
ensureFlushedBuider();
sendRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(),
}
}
+ private void handleForwardedModifyTransactionRequest(final Consumer<Response<?, ?>> callback,
+ final ModifyTransactionRequest req) {
+ req.getModifications().forEach(this::appendModification);
+
+ final java.util.Optional<PersistenceProtocol> maybeProto = req.getPersistenceProtocol();
+ if (maybeProto.isPresent()) {
+ // Persistence protocol implies we are sealed, propagate the marker, but hold off doing other actions
+ // until we know what we are going to do.
+ if (markSealed()) {
+ sealOnly();
+ }
+
+ final TransactionRequest<?> tmp;
+ switch (maybeProto.get()) {
+ case ABORT:
+ tmp = abortRequest();
+ sendRequest(tmp, resp -> {
+ completeModify(tmp, resp);
+ callback.accept(resp);
+ });
+ break;
+ case SIMPLE:
+ tmp = commitRequest(false);
+ sendRequest(tmp, resp -> {
+ completeModify(tmp, resp);
+ callback.accept(resp);
+ });
+ break;
+ case THREE_PHASE:
+ tmp = commitRequest(true);
+ sendRequest(tmp, resp -> {
+ recordSuccessfulRequest(tmp);
+ callback.accept(resp);
+ });
+ break;
+ case READY:
+ tmp = readyRequest();
+ sendRequest(tmp, resp -> {
+ recordSuccessfulRequest(tmp);
+ callback.accept(resp);
+ });
+ break;
+ default:
+ throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get());
+ }
+ }
+ }
+
@Override
void forwardToLocal(final LocalProxyTransaction successor, final TransactionRequest<?> request,
final Consumer<Response<?, ?>> callback) {
@Override
void handleReplayedRemoteRequest(final TransactionRequest<?> request,
- final @Nullable Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
- final Consumer<Response<?, ?>> cb = callback != null ? callback : resp -> { };
+ @Nullable final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
+ final Consumer<Response<?, ?>> cb = callback != null ? callback : resp -> { /* NOOP */ };
final Optional<Long> optTicks = Optional.of(Long.valueOf(enqueuedTicks));
if (request instanceof ModifyTransactionRequest) {
- final ModifyTransactionRequest req = (ModifyTransactionRequest) request;
- for (TransactionModification mod : req.getModifications()) {
- appendModification(mod, optTicks);
- }
-
- final java.util.Optional<PersistenceProtocol> maybeProto = req.getPersistenceProtocol();
- if (maybeProto.isPresent()) {
- // Persistence protocol implies we are sealed, propagate the marker, but hold off doing other actions
- // until we know what we are going to do.
- if (markSealed()) {
- sealOnly();
- }
-
- final TransactionRequest<?> tmp;
- switch (maybeProto.get()) {
- case ABORT:
- tmp = abortRequest();
- enqueueRequest(tmp, resp -> {
- completeModify(tmp, resp);
- cb.accept(resp);
- }, enqueuedTicks);
- break;
- case SIMPLE:
- tmp = commitRequest(false);
- enqueueRequest(tmp, resp -> {
- completeModify(tmp, resp);
- cb.accept(resp);
- }, enqueuedTicks);
- break;
- case THREE_PHASE:
- tmp = commitRequest(true);
- enqueueRequest(tmp, resp -> {
- recordSuccessfulRequest(tmp);
- cb.accept(resp);
- }, enqueuedTicks);
- break;
- case READY:
- tmp = readyRequest();
- enqueueRequest(tmp, resp -> {
- recordSuccessfulRequest(tmp);
- cb.accept(resp);
- }, enqueuedTicks);
- break;
- default:
- throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get());
- }
- }
+ handleReplayedModifyTransactionRequest(enqueuedTicks, cb, (ModifyTransactionRequest) request);
} else if (request instanceof ReadTransactionRequest) {
ensureFlushedBuider(optTicks);
enqueueRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(),
throw new IllegalArgumentException("Unhandled request {}" + request);
}
}
+
+ private void handleReplayedModifyTransactionRequest(final long enqueuedTicks, final Consumer<Response<?, ?>> cb,
+ final ModifyTransactionRequest req) {
+ req.getModifications().forEach(this::appendModification);
+
+ final java.util.Optional<PersistenceProtocol> maybeProto = req.getPersistenceProtocol();
+ if (maybeProto.isPresent()) {
+ // Persistence protocol implies we are sealed, propagate the marker, but hold off doing other actions
+ // until we know what we are going to do.
+ if (markSealed()) {
+ sealOnly();
+ }
+
+ final TransactionRequest<?> tmp;
+ switch (maybeProto.get()) {
+ case ABORT:
+ tmp = abortRequest();
+ enqueueRequest(tmp, resp -> {
+ completeModify(tmp, resp);
+ cb.accept(resp);
+ }, enqueuedTicks);
+ break;
+ case SIMPLE:
+ tmp = commitRequest(false);
+ enqueueRequest(tmp, resp -> {
+ completeModify(tmp, resp);
+ cb.accept(resp);
+ }, enqueuedTicks);
+ break;
+ case THREE_PHASE:
+ tmp = commitRequest(true);
+ enqueueRequest(tmp, resp -> {
+ recordSuccessfulRequest(tmp);
+ cb.accept(resp);
+ }, enqueuedTicks);
+ break;
+ case READY:
+ tmp = readyRequest();
+ enqueueRequest(tmp, resp -> {
+ recordSuccessfulRequest(tmp);
+ cb.accept(resp);
+ }, enqueuedTicks);
+ break;
+ default:
+ throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get());
+ }
+ }
+ }
}