X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FRemoteProxyTransaction.java;h=d0d7946f62451cca0f6c34750166ef692cbc272d;hb=744e836af4ebf52e6be62308558355ccc228546c;hp=3120f6f4edad443624b4ec989f541312e44b7144;hpb=8232a626b43fdd2f5799da0fbcfb0f02d3c8f4fb;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java index 3120f6f4ed..d0d7946f62 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java @@ -7,13 +7,15 @@ */ package org.opendaylight.controller.cluster.databroker.actors.dds; -import com.google.common.base.Optional; -import com.google.common.util.concurrent.CheckedFuture; -import com.google.common.util.concurrent.Futures; +import static com.google.common.base.Verify.verify; + +import com.google.common.util.concurrent.FluentFuture; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; +import java.util.Optional; +import java.util.OptionalLong; import java.util.function.Consumer; -import javax.annotation.Nullable; +import org.opendaylight.controller.cluster.access.client.RequestTimeoutException; import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.AbstractReadTransactionRequest; @@ -36,12 +38,14 @@ import org.opendaylight.controller.cluster.access.commands.TransactionPurgeReque import org.opendaylight.controller.cluster.access.commands.TransactionRequest; import org.opendaylight.controller.cluster.access.commands.TransactionSuccess; import org.opendaylight.controller.cluster.access.commands.TransactionWrite; +import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.RequestFailure; import org.opendaylight.controller.cluster.access.concepts.Response; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.util.AbstractDataTreeModificationCursor; +import org.opendaylight.mdsal.common.api.DataStoreUnavailableException; import org.opendaylight.mdsal.common.api.ReadFailedException; -import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture; +import org.opendaylight.yangtools.util.concurrent.FluentFutures; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; @@ -97,45 +101,46 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { @Override void doDelete(final YangInstanceIdentifier path) { - appendModification(new TransactionDelete(path), Optional.absent()); + appendModification(new TransactionDelete(path), OptionalLong.empty()); } @Override void doMerge(final YangInstanceIdentifier path, final NormalizedNode data) { - appendModification(new TransactionMerge(path, data), Optional.absent()); + appendModification(new TransactionMerge(path, data), OptionalLong.empty()); } @Override void doWrite(final YangInstanceIdentifier path, final NormalizedNode data) { - appendModification(new TransactionWrite(path, data), Optional.absent()); + appendModification(new TransactionWrite(path, data), OptionalLong.empty()); } - private CheckedFuture sendReadRequest(final AbstractReadTransactionRequest request, + private FluentFuture sendReadRequest(final AbstractReadTransactionRequest request, final Consumer> completer, final ListenableFuture future) { // Check if a previous operation failed. If it has, do not bother sending anything and report a failure final Exception local = operationFailure; if (local != null) { - return Futures.immediateFailedCheckedFuture(new ReadFailedException("Previous operation failed", local)); + return FluentFutures.immediateFailedFluentFuture( + new ReadFailedException("Previous operation failed", local)); } // Make sure we send any modifications before issuing a read ensureFlushedBuider(); sendRequest(request, completer); - return MappingCheckedFuture.create(future, ReadFailedException.MAPPER); + return FluentFuture.from(future); } @Override - CheckedFuture doExists(final YangInstanceIdentifier path) { + FluentFuture doExists(final YangInstanceIdentifier path) { final SettableFuture future = SettableFuture.create(); return sendReadRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(), path, - isSnapshotOnly()), t -> completeExists(future, t), future); + isSnapshotOnly()), t -> completeExists(path, future, t), future); } @Override - CheckedFuture>, ReadFailedException> doRead(final YangInstanceIdentifier path) { + FluentFuture>> doRead(final YangInstanceIdentifier path) { final SettableFuture>> future = SettableFuture.create(); return sendReadRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(), path, - isSnapshotOnly()), t -> completeRead(future, t), future); + isSnapshotOnly()), t -> completeRead(path, future, t), future); } private void ensureInitializedBuilder() { @@ -146,35 +151,35 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { } private void ensureFlushedBuider() { - ensureFlushedBuider(Optional.absent()); + ensureFlushedBuider(OptionalLong.empty()); } - private void ensureFlushedBuider(final Optional enqueuedTicks) { + private void ensureFlushedBuider(final OptionalLong enqueuedTicks) { if (builderBusy) { flushBuilder(enqueuedTicks); } } - private void flushBuilder(final Optional enqueuedTicks) { + private void flushBuilder(final OptionalLong enqueuedTicks) { final ModifyTransactionRequest request = builder.build(); builderBusy = false; sendModification(request, enqueuedTicks); } - private void sendModification(final TransactionRequest request, final Optional enqueuedTicks) { + private void sendModification(final TransactionRequest request, final OptionalLong enqueuedTicks) { if (enqueuedTicks.isPresent()) { - enqueueRequest(request, response -> completeModify(request, response), enqueuedTicks.get().longValue()); + enqueueRequest(request, response -> completeModify(request, response), enqueuedTicks.getAsLong()); } else { sendRequest(request, response -> completeModify(request, response)); } } private void appendModification(final TransactionModification modification) { - appendModification(modification, Optional.absent()); + appendModification(modification, OptionalLong.empty()); } - private void appendModification(final TransactionModification modification, final Optional enqueuedTicks) { + private void appendModification(final TransactionModification modification, final OptionalLong enqueuedTicks) { if (operationFailure == null) { ensureInitializedBuilder(); @@ -201,7 +206,9 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { private Exception recordFailedResponse(final Response response) { final Exception failure; if (response instanceof RequestFailure) { - failure = ((RequestFailure) response).getCause(); + final RequestException cause = ((RequestFailure) response).getCause(); + failure = cause instanceof RequestTimeoutException + ? new DataStoreUnavailableException(cause.getMessage(), cause) : cause; } else { LOG.warn("Unhandled response {}", response); failure = new IllegalArgumentException("Unhandled response " + response.getClass()); @@ -214,30 +221,32 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { return failure; } - private void failFuture(final SettableFuture future, final Response response) { - future.setException(recordFailedResponse(response)); + private void failReadFuture(final SettableFuture future, final String message, + final Response response) { + future.setException(new ReadFailedException(message, recordFailedResponse(response))); } - private void completeExists(final SettableFuture future, final Response response) { - LOG.debug("Exists request completed with {}", response); + private void completeExists(final YangInstanceIdentifier path, final SettableFuture future, + final Response response) { + LOG.debug("Exists request for {} completed with {}", path, response); if (response instanceof ExistsTransactionSuccess) { future.set(((ExistsTransactionSuccess) response).getExists()); } else { - failFuture(future, response); + failReadFuture(future, "Error executing exists request for path " + path, response); } recordFinishedRequest(response); } - private void completeRead(final SettableFuture>> future, - final Response response) { - LOG.debug("Read request completed with {}", response); + private void completeRead(final YangInstanceIdentifier path, + final SettableFuture>> future, final Response response) { + LOG.debug("Read request for {} completed with {}", path, response); if (response instanceof ReadTransactionSuccess) { future.set(((ReadTransactionSuccess) response).getData()); } else { - failFuture(future, response); + failReadFuture(future, "Error reading data for path " + path, response); } recordFinishedRequest(response); @@ -267,7 +276,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { } @Override - boolean sealAndSend(final Optional enqueuedTicks) { + boolean sealAndSend(final OptionalLong enqueuedTicks) { if (sendReadyOnSeal) { ensureInitializedBuilder(); builder.setReady(); @@ -277,14 +286,14 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { } @Override - java.util.Optional flushState() { + Optional flushState() { if (!builderBusy) { - return java.util.Optional.empty(); + return Optional.empty(); } final ModifyTransactionRequest request = builder.build(); builderBusy = false; - return java.util.Optional.of(request); + return Optional.of(request); } @Override @@ -335,12 +344,14 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { final ModifyTransactionRequest req) { req.getModifications().forEach(this::appendModification); - final java.util.Optional maybeProto = req.getPersistenceProtocol(); + final Optional maybeProto = req.getPersistenceProtocol(); if (maybeProto.isPresent()) { // Persistence protocol implies we are sealed, propagate the marker, but hold off doing other actions // until we know what we are going to do. if (markSealed()) { - sealOnly(); + if (!sealOnly()) { + LOG.debug("Proxy {} has a successor, which should receive seal through a separate request", this); + } } final TransactionRequest tmp; @@ -400,7 +411,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { private void replayLocalCommitRequest(final CommitLocalTransactionRequest request, final Consumer> callback, final long enqueuedTicks) { final DataTreeModification mod = request.getModification(); - final Optional optTicks = Optional.of(Long.valueOf(enqueuedTicks)); + final OptionalLong optTicks = OptionalLong.of(enqueuedTicks); mod.applyToCursor(new AbstractDataTreeModificationCursor() { @Override @@ -423,10 +434,10 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { } @Override - void handleReplayedRemoteRequest(final TransactionRequest request, - @Nullable final Consumer> callback, final long enqueuedTicks) { + void handleReplayedRemoteRequest(final TransactionRequest request, final Consumer> callback, + final long enqueuedTicks) { final Consumer> cb = callback != null ? callback : resp -> { /* NOOP */ }; - final Optional optTicks = Optional.of(Long.valueOf(enqueuedTicks)); + final OptionalLong optTicks = OptionalLong.of(enqueuedTicks); if (request instanceof ModifyTransactionRequest) { handleReplayedModifyTransactionRequest(enqueuedTicks, cb, (ModifyTransactionRequest) request); @@ -476,12 +487,12 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { final ModifyTransactionRequest req) { req.getModifications().forEach(this::appendModification); - final java.util.Optional maybeProto = req.getPersistenceProtocol(); + final Optional maybeProto = req.getPersistenceProtocol(); if (maybeProto.isPresent()) { // Persistence protocol implies we are sealed, propagate the marker, but hold off doing other actions // until we know what we are going to do. if (markSealed()) { - sealOnly(); + verify(sealOnly(), "Attempted to replay seal on %s", this); } final TransactionRequest tmp;