X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FRemoteProxyTransaction.java;h=284be4a45793699576e412c88d559d15bc916bba;hb=refs%2Fchanges%2F10%2F78310%2F5;hp=e12c724db108dac0bb9341dc1bb04bff66175dd2;hpb=7e62b4a59f9e43bcd0585845f1aeb55c44199f27;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java index e12c724db1..284be4a457 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java @@ -7,13 +7,13 @@ */ package org.opendaylight.controller.cluster.databroker.actors.dds; -import com.google.common.base.Function; import com.google.common.util.concurrent.FluentFuture; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import java.util.Optional; import java.util.function.Consumer; import javax.annotation.Nullable; +import org.opendaylight.controller.cluster.access.client.RequestTimeoutException; import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.AbstractReadTransactionRequest; @@ -36,10 +36,12 @@ import org.opendaylight.controller.cluster.access.commands.TransactionPurgeReque import org.opendaylight.controller.cluster.access.commands.TransactionRequest; import org.opendaylight.controller.cluster.access.commands.TransactionSuccess; import org.opendaylight.controller.cluster.access.commands.TransactionWrite; +import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.RequestFailure; import org.opendaylight.controller.cluster.access.concepts.Response; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.util.AbstractDataTreeModificationCursor; +import org.opendaylight.mdsal.common.api.DataStoreUnavailableException; import org.opendaylight.mdsal.common.api.ReadFailedException; import org.opendaylight.yangtools.util.concurrent.FluentFutures; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; @@ -66,8 +68,6 @@ import org.slf4j.LoggerFactory; final class RemoteProxyTransaction extends AbstractProxyTransaction { private static final Logger LOG = LoggerFactory.getLogger(RemoteProxyTransaction.class); - private static final Function NOOP_EXCEPTION_MAPPER = ex -> ex; - // FIXME: make this tuneable private static final int REQUEST_MAX_MODIFICATIONS = 1000; @@ -131,14 +131,14 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { FluentFuture doExists(final YangInstanceIdentifier path) { final SettableFuture future = SettableFuture.create(); return sendReadRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(), path, - isSnapshotOnly()), t -> completeExists(future, t), future); + isSnapshotOnly()), t -> completeExists(path, future, t), future); } @Override FluentFuture>> doRead(final YangInstanceIdentifier path) { final SettableFuture>> future = SettableFuture.create(); return sendReadRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(), path, - isSnapshotOnly()), t -> completeRead(future, t), future); + isSnapshotOnly()), t -> completeRead(path, future, t), future); } private void ensureInitializedBuilder() { @@ -197,15 +197,16 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { // Happy path recordSuccessfulRequest(request); } else { - recordFailedResponse(response, NOOP_EXCEPTION_MAPPER); + recordFailedResponse(response); } } - private X recordFailedResponse(final Response response, - final Function exMapper) { + private Exception recordFailedResponse(final Response response) { final Exception failure; if (response instanceof RequestFailure) { - failure = ((RequestFailure) response).getCause(); + final RequestException cause = ((RequestFailure) response).getCause(); + failure = cause instanceof RequestTimeoutException + ? new DataStoreUnavailableException(cause.getMessage(), cause) : cause; } else { LOG.warn("Unhandled response {}", response); failure = new IllegalArgumentException("Unhandled response " + response.getClass()); @@ -215,33 +216,35 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { LOG.debug("Transaction {} failed", getIdentifier(), failure); operationFailure = failure; } - return exMapper.apply(failure); + return failure; } - private void failReadFuture(final SettableFuture future, final Response response) { - future.setException(recordFailedResponse(response, ReadFailedException.MAPPER)); + private void failReadFuture(final SettableFuture future, final String message, + final Response response) { + future.setException(new ReadFailedException(message, recordFailedResponse(response))); } - private void completeExists(final SettableFuture future, final Response response) { - LOG.debug("Exists request completed with {}", response); + private void completeExists(final YangInstanceIdentifier path, final SettableFuture future, + final Response response) { + LOG.debug("Exists request for {} completed with {}", path, response); if (response instanceof ExistsTransactionSuccess) { future.set(((ExistsTransactionSuccess) response).getExists()); } else { - failReadFuture(future, response); + failReadFuture(future, "Error executing exists request for path " + path, response); } recordFinishedRequest(response); } - private void completeRead(final SettableFuture>> future, - final Response response) { - LOG.debug("Read request completed with {}", response); + private void completeRead(final YangInstanceIdentifier path, + final SettableFuture>> future, final Response response) { + LOG.debug("Read request for {} completed with {}", path, response); if (response instanceof ReadTransactionSuccess) { future.set(((ReadTransactionSuccess) response).getData()); } else { - failReadFuture(future, response); + failReadFuture(future, "Error reading data for path " + path, response); } recordFinishedRequest(response);