abstract void applyModifyTransactionRequest(ModifyTransactionRequest request,
@Nullable Consumer<Response<?, ?>> callback);
+ abstract void replayModifyTransactionRequest(ModifyTransactionRequest request,
+ @Nullable Consumer<Response<?, ?>> callback, long enqueuedTicks);
+
@Override
final CheckedFuture<Boolean, ReadFailedException> doExists(final YangInstanceIdentifier path) {
return Futures.immediateCheckedFuture(readOnlyView().readNode(path).isPresent());
}
@Override
- void handleForwardedLocalRequest(final AbstractLocalTransactionRequest<?> request,
- final Consumer<Response<?, ?>> callback) {
+ void handleReplayedLocalRequest(final AbstractLocalTransactionRequest<?> request,
+ final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
if (request instanceof AbortLocalTransactionRequest) {
- sendAbort(request, callback);
+ enqueueAbort(request, callback, enqueuedTicks);
} else {
throw new IllegalArgumentException("Unhandled request" + request);
}
}
- @Override
- void handleForwardedRemoteRequest(final TransactionRequest<?> request,
+ private boolean handleReadRequest(final TransactionRequest<?> request,
final @Nullable Consumer<Response<?, ?>> callback) {
- if (request instanceof ModifyTransactionRequest) {
- applyModifyTransactionRequest((ModifyTransactionRequest) request, callback);
- } else if (request instanceof ReadTransactionRequest) {
+ if (request instanceof ReadTransactionRequest) {
final YangInstanceIdentifier path = ((ReadTransactionRequest) request).getPath();
final Optional<NormalizedNode<?, ?>> result = readOnlyView().readNode(path);
callback.accept(new ReadTransactionSuccess(request.getTarget(), request.getSequence(), result));
+ return true;
} else if (request instanceof ExistsTransactionRequest) {
final YangInstanceIdentifier path = ((ExistsTransactionRequest) request).getPath();
final boolean result = readOnlyView().readNode(path).isPresent();
callback.accept(new ExistsTransactionSuccess(request.getTarget(), request.getSequence(), result));
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ void handleReplayedRemoteRequest(final TransactionRequest<?> request,
+ final @Nullable Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
+ if (request instanceof ModifyTransactionRequest) {
+ replayModifyTransactionRequest((ModifyTransactionRequest) request, callback, enqueuedTicks);
+ } else if (handleReadRequest(request, callback)) {
+ // No-op
} else if (request instanceof TransactionPurgeRequest) {
- purge();
+ enqueuePurge(enqueuedTicks);
+ } else {
+ throw new IllegalArgumentException("Unhandled request " + request);
+ }
+ }
+
+ /**
+ * Remote-to-local equivalent of {@link #handleReplayedRemoteRequest(TransactionRequest, Consumer, long)},
+ * except it is invoked in the forwarding path from
+ * {@link RemoteProxyTransaction#forwardToLocal(LocalProxyTransaction, TransactionRequest, Consumer)}.
+ *
+ * @param request Forwarded request
+ * @param callback Callback to be invoked once the request completes
+ */
+ void handleForwardedRemoteRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
+ if (request instanceof ModifyTransactionRequest) {
+ applyModifyTransactionRequest((ModifyTransactionRequest) request, callback);
+ } else if (handleReadRequest(request, callback)) {
+ // No-op
+ } else if (request instanceof TransactionPurgeRequest) {
+ sendPurge();
} else {
throw new IllegalArgumentException("Unhandled request " + request);
}
successor.abort();
} else if (request instanceof TransactionPurgeRequest) {
LOG.debug("Forwarding purge {} to successor {}", request, successor);
- successor.purge();
+ successor.sendPurge();
} else {
throw new IllegalArgumentException("Unhandled request" + request);
}
if (request instanceof AbortLocalTransactionRequest) {
successor.sendAbort(request, callback);
} else if (request instanceof TransactionPurgeRequest) {
- successor.purge();
+ successor.sendPurge();
} else {
throw new IllegalArgumentException("Unhandled request" + request);
}
void sendAbort(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
sendRequest(request, callback);
}
+
+ void enqueueAbort(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
+ final long enqueuedTicks) {
+ enqueueRequest(request, callback, enqueuedTicks);
+ }
}