X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FRemoteProxyTransaction.java;h=0f75c747f02273bd11d278b0432199ae4448de9e;hb=e4673e2f5f64daaed0ce44680f425fb1171b1fee;hp=824a2f9b31ebed9c0f02f69c366061002604db92;hpb=abaef4a5ae37f27542155457fe7306a4662b1eeb;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java index 824a2f9b31..0f75c747f0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java @@ -49,7 +49,7 @@ import org.opendaylight.yangtools.util.concurrent.FluentFutures; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; +import org.opendaylight.yangtools.yang.data.tree.api.DataTreeModification; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -205,8 +205,8 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { private Exception recordFailedResponse(final Response response) { final Exception failure; - if (response instanceof RequestFailure) { - final RequestException cause = ((RequestFailure) response).getCause(); + if (response instanceof RequestFailure requestFailure) { + final RequestException cause = requestFailure.getCause(); failure = cause instanceof RequestTimeoutException ? new DataStoreUnavailableException(cause.getMessage(), cause) : cause; } else { @@ -230,8 +230,8 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { final Response response) { LOG.debug("Exists request for {} completed with {}", path, response); - if (response instanceof ExistsTransactionSuccess) { - future.set(((ExistsTransactionSuccess) response).getExists()); + if (response instanceof ExistsTransactionSuccess success) { + future.set(success.getExists()); } else { failReadFuture(future, "Error executing exists request for path " + path, response); } @@ -243,8 +243,8 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { final Response response) { LOG.debug("Read request for {} completed with {}", path, response); - if (response instanceof ReadTransactionSuccess) { - future.set(((ReadTransactionSuccess) response).getData()); + if (response instanceof ReadTransactionSuccess success) { + future.set(success.getData()); } else { failReadFuture(future, "Error reading data for path " + path, response); } @@ -303,19 +303,19 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { } void handleForwardedRequest(final TransactionRequest request, final Consumer> callback) { - if (request instanceof ModifyTransactionRequest) { - handleForwardedModifyTransactionRequest(callback, (ModifyTransactionRequest) request); - } else if (request instanceof ReadTransactionRequest) { + if (request instanceof ModifyTransactionRequest modifyRequest) { + handleForwardedModifyTransactionRequest(callback, modifyRequest); + } else if (request instanceof ReadTransactionRequest readRequest) { ensureFlushedBuider(); sendRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(), - ((ReadTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> { + readRequest.getPath(), isSnapshotOnly()), resp -> { recordFinishedRequest(resp); callback.accept(resp); }); - } else if (request instanceof ExistsTransactionRequest) { + } else if (request instanceof ExistsTransactionRequest existsRequest) { ensureFlushedBuider(); sendRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(), - ((ExistsTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> { + existsRequest.getPath(), isSnapshotOnly()), resp -> { recordFinishedRequest(resp); callback.accept(resp); }); @@ -336,7 +336,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { } else if (request instanceof TransactionPurgeRequest) { enqueuePurge(callback); } else { - throw new IllegalArgumentException("Unhandled request {}" + request); + throw unhandledRequest(request); } } @@ -355,7 +355,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { } final TransactionRequest tmp; - switch (maybeProto.get()) { + switch (maybeProto.orElseThrow()) { case ABORT: tmp = abortRequest(); sendRequest(tmp, resp -> { @@ -385,7 +385,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { }); break; default: - throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get()); + throw new IllegalArgumentException("Unhandled protocol " + maybeProto.orElseThrow()); } } } @@ -399,12 +399,12 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { @Override void handleReplayedLocalRequest(final AbstractLocalTransactionRequest request, final Consumer> callback, final long enqueuedTicks) { - if (request instanceof CommitLocalTransactionRequest) { - replayLocalCommitRequest((CommitLocalTransactionRequest) request, callback, enqueuedTicks); + if (request instanceof CommitLocalTransactionRequest commitRequest) { + replayLocalCommitRequest(commitRequest, callback, enqueuedTicks); } else if (request instanceof AbortLocalTransactionRequest) { enqueueRequest(abortRequest(), callback, enqueuedTicks); } else { - throw new IllegalStateException("Unhandled request " + request); + throw unhandledRequest(request); } } @@ -439,19 +439,19 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { final Consumer> cb = callback != null ? callback : resp -> { /* NOOP */ }; final OptionalLong optTicks = OptionalLong.of(enqueuedTicks); - if (request instanceof ModifyTransactionRequest) { - handleReplayedModifyTransactionRequest(enqueuedTicks, cb, (ModifyTransactionRequest) request); - } else if (request instanceof ReadTransactionRequest) { + if (request instanceof ModifyTransactionRequest modifyRequest) { + handleReplayedModifyTransactionRequest(enqueuedTicks, cb, modifyRequest); + } else if (request instanceof ReadTransactionRequest readRequest) { ensureFlushedBuider(optTicks); enqueueRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(), - ((ReadTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> { + readRequest.getPath(), isSnapshotOnly()), resp -> { recordFinishedRequest(resp); cb.accept(resp); }, enqueuedTicks); - } else if (request instanceof ExistsTransactionRequest) { + } else if (request instanceof ExistsTransactionRequest existsRequest) { ensureFlushedBuider(optTicks); enqueueRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(), - ((ExistsTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> { + existsRequest.getPath(), isSnapshotOnly()), resp -> { recordFinishedRequest(resp); cb.accept(resp); }, enqueuedTicks); @@ -472,14 +472,13 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { enqueueDoAbort(callback, enqueuedTicks); } else if (request instanceof TransactionPurgeRequest) { enqueuePurge(callback, enqueuedTicks); - } else if (request instanceof IncrementTransactionSequenceRequest) { - final IncrementTransactionSequenceRequest req = (IncrementTransactionSequenceRequest) request; + } else if (request instanceof IncrementTransactionSequenceRequest req) { ensureFlushedBuider(optTicks); enqueueRequest(new IncrementTransactionSequenceRequest(getIdentifier(), nextSequence(), localActor(), snapshotOnly, req.getIncrement()), callback, enqueuedTicks); incrementSequence(req.getIncrement()); } else { - throw new IllegalArgumentException("Unhandled request {}" + request); + throw unhandledRequest(request); } } @@ -496,7 +495,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { } final TransactionRequest tmp; - switch (maybeProto.get()) { + switch (maybeProto.orElseThrow()) { case ABORT: tmp = abortRequest(); enqueueRequest(tmp, resp -> { @@ -526,7 +525,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { }, enqueuedTicks); break; default: - throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get()); + throw new IllegalArgumentException("Unhandled protocol " + maybeProto.orElseThrow()); } } }