abstract class LocalProxyTransaction extends AbstractProxyTransaction {
private static final Logger LOG = LoggerFactory.getLogger(LocalProxyTransaction.class);
- private final TransactionIdentifier identifier;
+ private final @NonNull TransactionIdentifier identifier;
LocalProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier, final boolean isDone) {
super(parent, isDone);
@Nullable Consumer<Response<?, ?>> callback, long enqueuedTicks);
@Override
- final FluentFuture<Boolean> doExists(final YangInstanceIdentifier path) {
+ FluentFuture<Boolean> doExists(final YangInstanceIdentifier path) {
return FluentFutures.immediateBooleanFluentFuture(readOnlyView().readNode(path).isPresent());
}
@Override
- final FluentFuture<Optional<NormalizedNode>> doRead(final YangInstanceIdentifier path) {
+ FluentFuture<Optional<NormalizedNode>> doRead(final YangInstanceIdentifier path) {
return FluentFutures.immediateFluentFuture(readOnlyView().readNode(path));
}
}
}
- private boolean handleReadRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
- // Note we delay completion of read requests to limit the scope at which the client can run, as they have
- // listeners, which we do not want to execute while we are reconnecting.
- if (request instanceof ReadTransactionRequest) {
- if (callback != null) {
- final YangInstanceIdentifier path = ((ReadTransactionRequest) request).getPath();
- final Optional<NormalizedNode> result = readOnlyView().readNode(path);
- executeInActor(() -> callback.accept(new ReadTransactionSuccess(request.getTarget(),
- request.getSequence(), result)));
- }
- return true;
- } else if (request instanceof ExistsTransactionRequest) {
- if (callback != null) {
- final YangInstanceIdentifier path = ((ExistsTransactionRequest) request).getPath();
- final boolean result = readOnlyView().readNode(path).isPresent();
- executeInActor(() -> callback.accept(new ExistsTransactionSuccess(request.getTarget(),
- request.getSequence(), result)));
- }
- return true;
- } else {
- return false;
- }
- }
-
@Override
void handleReplayedRemoteRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
final long enqueuedTicks) {
}
}
+ @NonNull Response<?, ?> handleExistsRequest(final @NonNull DataTreeSnapshot snapshot,
+ final @NonNull ExistsTransactionRequest request) {
+ return new ExistsTransactionSuccess(request.getTarget(), request.getSequence(),
+ snapshot.readNode(request.getPath()).isPresent());
+ }
+
+ @NonNull Response<?, ?> handleReadRequest(final @NonNull DataTreeSnapshot snapshot,
+ final @NonNull ReadTransactionRequest request) {
+ return new ReadTransactionSuccess(request.getTarget(), request.getSequence(),
+ snapshot.readNode(request.getPath()));
+ }
+
+ private boolean handleReadRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
+ // Note we delay completion of read requests to limit the scope at which the client can run, as they have
+ // listeners, which we do not want to execute while we are reconnecting.
+ if (request instanceof ReadTransactionRequest) {
+ if (callback != null) {
+ final var response = handleReadRequest(readOnlyView(), (ReadTransactionRequest) request);
+ executeInActor(() -> callback.accept(response));
+ }
+ return true;
+ } else if (request instanceof ExistsTransactionRequest) {
+ if (callback != null) {
+ final var response = handleExistsRequest(readOnlyView(), (ExistsTransactionRequest) request);
+ executeInActor(() -> callback.accept(response));
+ }
+ return true;
+ } else {
+ return false;
+ }
+ }
+
@Override
final void forwardToRemote(final RemoteProxyTransaction successor, final TransactionRequest<?> request,
final Consumer<Response<?, ?>> callback) {
import static com.google.common.base.Verify.verify;
import static com.google.common.base.Verify.verifyNotNull;
+import com.google.common.util.concurrent.FluentFuture;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.function.BiConsumer;
import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequestBuilder;
import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
+import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionDelete;
import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
import org.opendaylight.controller.cluster.access.concepts.Response;
+import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.util.AbstractDataTreeModificationCursor;
+import org.opendaylight.mdsal.common.api.ReadFailedException;
+import org.opendaylight.yangtools.util.concurrent.FluentFutures;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
private Exception recordedFailure;
LocalReadWriteProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier,
- final DataTreeSnapshot snapshot) {
+ final DataTreeSnapshot snapshot) {
super(parent, identifier, false);
modification = (CursorAwareDataTreeModification) snapshot.newModification();
}
return getModification();
}
+ @Override
+ FluentFuture<Boolean> doExists(final YangInstanceIdentifier path) {
+ final var ex = recordedFailure;
+ return ex == null ? super.doExists(path)
+ : FluentFutures.immediateFailedFluentFuture(ReadFailedException.MAPPER.apply(ex));
+ }
+
+ @Override
+ FluentFuture<Optional<NormalizedNode>> doRead(final YangInstanceIdentifier path) {
+ final var ex = recordedFailure;
+ return ex == null ? super.doRead(path)
+ : FluentFutures.immediateFailedFluentFuture(ReadFailedException.MAPPER.apply(ex));
+ }
+
@Override
@SuppressWarnings("checkstyle:IllegalCatch")
void doDelete(final YangInstanceIdentifier path) {
}
}
+ @Override
+ Response<?, ?> handleExistsRequest(final DataTreeSnapshot snapshot, final ExistsTransactionRequest request) {
+ final var ex = recordedFailure;
+ return ex == null ? super.handleExistsRequest(snapshot, request)
+ : request.toRequestFailure(
+ new RuntimeRequestException("Previous modification failed", ReadFailedException.MAPPER.apply(ex)));
+ }
+
+ @Override
+ Response<?, ?> handleReadRequest(final DataTreeSnapshot snapshot, final ReadTransactionRequest request) {
+ final var ex = recordedFailure;
+ return ex == null ? super.handleReadRequest(snapshot, request)
+ : request.toRequestFailure(
+ new RuntimeRequestException("Previous modification failed", ReadFailedException.MAPPER.apply(ex)));
+ }
+
@Override
void forwardToLocal(final LocalProxyTransaction successor, final TransactionRequest<?> request,
final Consumer<Response<?, ?>> callback) {