X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FRemoteProxyTransaction.java;h=e12c724db108dac0bb9341dc1bb04bff66175dd2;hb=634dfac8eead60f443bf75e749c70d1f2bb29198;hp=5a6b539494e3577f8d0642e95ec2bcc47a3f72a0;hpb=7991491f2854dde2ec625ed6c08b44df7d258795;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java index 5a6b539494..e12c724db1 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java @@ -7,11 +7,11 @@ */ package org.opendaylight.controller.cluster.databroker.actors.dds; -import com.google.common.base.Optional; -import com.google.common.util.concurrent.CheckedFuture; -import com.google.common.util.concurrent.Futures; +import com.google.common.base.Function; +import com.google.common.util.concurrent.FluentFuture; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; +import java.util.Optional; import java.util.function.Consumer; import javax.annotation.Nullable; import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest; @@ -41,7 +41,7 @@ import org.opendaylight.controller.cluster.access.concepts.Response; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.util.AbstractDataTreeModificationCursor; import org.opendaylight.mdsal.common.api.ReadFailedException; -import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture; +import org.opendaylight.yangtools.util.concurrent.FluentFutures; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; @@ -66,6 +66,8 @@ import org.slf4j.LoggerFactory; final class RemoteProxyTransaction extends AbstractProxyTransaction { private static final Logger LOG = LoggerFactory.getLogger(RemoteProxyTransaction.class); + private static final Function NOOP_EXCEPTION_MAPPER = ex -> ex; + // FIXME: make this tuneable private static final int REQUEST_MAX_MODIFICATIONS = 1000; @@ -97,42 +99,43 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { @Override void doDelete(final YangInstanceIdentifier path) { - appendModification(new TransactionDelete(path), Optional.absent()); + appendModification(new TransactionDelete(path), Optional.empty()); } @Override void doMerge(final YangInstanceIdentifier path, final NormalizedNode data) { - appendModification(new TransactionMerge(path, data), Optional.absent()); + appendModification(new TransactionMerge(path, data), Optional.empty()); } @Override void doWrite(final YangInstanceIdentifier path, final NormalizedNode data) { - appendModification(new TransactionWrite(path, data), Optional.absent()); + appendModification(new TransactionWrite(path, data), Optional.empty()); } - private CheckedFuture sendReadRequest(final AbstractReadTransactionRequest request, + private FluentFuture sendReadRequest(final AbstractReadTransactionRequest request, final Consumer> completer, final ListenableFuture future) { // Check if a previous operation failed. If it has, do not bother sending anything and report a failure final Exception local = operationFailure; if (local != null) { - return Futures.immediateFailedCheckedFuture(new ReadFailedException("Previous operation failed", local)); + return FluentFutures.immediateFailedFluentFuture( + new ReadFailedException("Previous operation failed", local)); } // Make sure we send any modifications before issuing a read ensureFlushedBuider(); sendRequest(request, completer); - return MappingCheckedFuture.create(future, ReadFailedException.MAPPER); + return FluentFuture.from(future); } @Override - CheckedFuture doExists(final YangInstanceIdentifier path) { + FluentFuture doExists(final YangInstanceIdentifier path) { final SettableFuture future = SettableFuture.create(); return sendReadRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(), path, isSnapshotOnly()), t -> completeExists(future, t), future); } @Override - CheckedFuture>, ReadFailedException> doRead(final YangInstanceIdentifier path) { + FluentFuture>> doRead(final YangInstanceIdentifier path) { final SettableFuture>> future = SettableFuture.create(); return sendReadRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(), path, isSnapshotOnly()), t -> completeRead(future, t), future); @@ -146,7 +149,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { } private void ensureFlushedBuider() { - ensureFlushedBuider(Optional.absent()); + ensureFlushedBuider(Optional.empty()); } private void ensureFlushedBuider(final Optional enqueuedTicks) { @@ -171,7 +174,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { } private void appendModification(final TransactionModification modification) { - appendModification(modification, Optional.absent()); + appendModification(modification, Optional.empty()); } private void appendModification(final TransactionModification modification, final Optional enqueuedTicks) { @@ -194,11 +197,12 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { // Happy path recordSuccessfulRequest(request); } else { - recordFailedResponse(response); + recordFailedResponse(response, NOOP_EXCEPTION_MAPPER); } } - private Exception recordFailedResponse(final Response response) { + private X recordFailedResponse(final Response response, + final Function exMapper) { final Exception failure; if (response instanceof RequestFailure) { failure = ((RequestFailure) response).getCause(); @@ -211,11 +215,11 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { LOG.debug("Transaction {} failed", getIdentifier(), failure); operationFailure = failure; } - return failure; + return exMapper.apply(failure); } - private void failFuture(final SettableFuture future, final Response response) { - future.setException(recordFailedResponse(response)); + private void failReadFuture(final SettableFuture future, final Response response) { + future.setException(recordFailedResponse(response, ReadFailedException.MAPPER)); } private void completeExists(final SettableFuture future, final Response response) { @@ -224,7 +228,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { if (response instanceof ExistsTransactionSuccess) { future.set(((ExistsTransactionSuccess) response).getExists()); } else { - failFuture(future, response); + failReadFuture(future, response); } recordFinishedRequest(response); @@ -237,7 +241,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { if (response instanceof ReadTransactionSuccess) { future.set(((ReadTransactionSuccess) response).getData()); } else { - failFuture(future, response); + failReadFuture(future, response); } recordFinishedRequest(response); @@ -295,52 +299,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { void handleForwardedRequest(final TransactionRequest request, final Consumer> callback) { if (request instanceof ModifyTransactionRequest) { - final ModifyTransactionRequest req = (ModifyTransactionRequest) request; - - req.getModifications().forEach(this::appendModification); - - final java.util.Optional maybeProto = req.getPersistenceProtocol(); - if (maybeProto.isPresent()) { - // Persistence protocol implies we are sealed, propagate the marker, but hold off doing other actions - // until we know what we are going to do. - if (markSealed()) { - sealOnly(); - } - - final TransactionRequest tmp; - switch (maybeProto.get()) { - case ABORT: - tmp = abortRequest(); - sendRequest(tmp, resp -> { - completeModify(tmp, resp); - callback.accept(resp); - }); - break; - case SIMPLE: - tmp = commitRequest(false); - sendRequest(tmp, resp -> { - completeModify(tmp, resp); - callback.accept(resp); - }); - break; - case THREE_PHASE: - tmp = commitRequest(true); - sendRequest(tmp, resp -> { - recordSuccessfulRequest(tmp); - callback.accept(resp); - }); - break; - case READY: - tmp = readyRequest(); - sendRequest(tmp, resp -> { - recordSuccessfulRequest(tmp); - callback.accept(resp); - }); - break; - default: - throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get()); - } - } + handleForwardedModifyTransactionRequest(callback, (ModifyTransactionRequest) request); } else if (request instanceof ReadTransactionRequest) { ensureFlushedBuider(); sendRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(), @@ -376,6 +335,54 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { } } + private void handleForwardedModifyTransactionRequest(final Consumer> callback, + final ModifyTransactionRequest req) { + req.getModifications().forEach(this::appendModification); + + final java.util.Optional maybeProto = req.getPersistenceProtocol(); + if (maybeProto.isPresent()) { + // Persistence protocol implies we are sealed, propagate the marker, but hold off doing other actions + // until we know what we are going to do. + if (markSealed()) { + sealOnly(); + } + + final TransactionRequest tmp; + switch (maybeProto.get()) { + case ABORT: + tmp = abortRequest(); + sendRequest(tmp, resp -> { + completeModify(tmp, resp); + callback.accept(resp); + }); + break; + case SIMPLE: + tmp = commitRequest(false); + sendRequest(tmp, resp -> { + completeModify(tmp, resp); + callback.accept(resp); + }); + break; + case THREE_PHASE: + tmp = commitRequest(true); + sendRequest(tmp, resp -> { + recordSuccessfulRequest(tmp); + callback.accept(resp); + }); + break; + case READY: + tmp = readyRequest(); + sendRequest(tmp, resp -> { + recordSuccessfulRequest(tmp); + callback.accept(resp); + }); + break; + default: + throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get()); + } + } + } + @Override void forwardToLocal(final LocalProxyTransaction successor, final TransactionRequest request, final Consumer> callback) { @@ -421,58 +428,12 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { @Override void handleReplayedRemoteRequest(final TransactionRequest request, - final @Nullable Consumer> callback, final long enqueuedTicks) { - final Consumer> cb = callback != null ? callback : resp -> { }; + @Nullable final Consumer> callback, final long enqueuedTicks) { + final Consumer> cb = callback != null ? callback : resp -> { /* NOOP */ }; final Optional optTicks = Optional.of(Long.valueOf(enqueuedTicks)); if (request instanceof ModifyTransactionRequest) { - final ModifyTransactionRequest req = (ModifyTransactionRequest) request; - for (TransactionModification mod : req.getModifications()) { - appendModification(mod, optTicks); - } - - final java.util.Optional maybeProto = req.getPersistenceProtocol(); - if (maybeProto.isPresent()) { - // Persistence protocol implies we are sealed, propagate the marker, but hold off doing other actions - // until we know what we are going to do. - if (markSealed()) { - sealOnly(); - } - - final TransactionRequest tmp; - switch (maybeProto.get()) { - case ABORT: - tmp = abortRequest(); - enqueueRequest(tmp, resp -> { - completeModify(tmp, resp); - cb.accept(resp); - }, enqueuedTicks); - break; - case SIMPLE: - tmp = commitRequest(false); - enqueueRequest(tmp, resp -> { - completeModify(tmp, resp); - cb.accept(resp); - }, enqueuedTicks); - break; - case THREE_PHASE: - tmp = commitRequest(true); - enqueueRequest(tmp, resp -> { - recordSuccessfulRequest(tmp); - cb.accept(resp); - }, enqueuedTicks); - break; - case READY: - tmp = readyRequest(); - enqueueRequest(tmp, resp -> { - recordSuccessfulRequest(tmp); - cb.accept(resp); - }, enqueuedTicks); - break; - default: - throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get()); - } - } + handleReplayedModifyTransactionRequest(enqueuedTicks, cb, (ModifyTransactionRequest) request); } else if (request instanceof ReadTransactionRequest) { ensureFlushedBuider(optTicks); enqueueRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(), @@ -514,4 +475,52 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { throw new IllegalArgumentException("Unhandled request {}" + request); } } + + private void handleReplayedModifyTransactionRequest(final long enqueuedTicks, final Consumer> cb, + final ModifyTransactionRequest req) { + req.getModifications().forEach(this::appendModification); + + final java.util.Optional maybeProto = req.getPersistenceProtocol(); + if (maybeProto.isPresent()) { + // Persistence protocol implies we are sealed, propagate the marker, but hold off doing other actions + // until we know what we are going to do. + if (markSealed()) { + sealOnly(); + } + + final TransactionRequest tmp; + switch (maybeProto.get()) { + case ABORT: + tmp = abortRequest(); + enqueueRequest(tmp, resp -> { + completeModify(tmp, resp); + cb.accept(resp); + }, enqueuedTicks); + break; + case SIMPLE: + tmp = commitRequest(false); + enqueueRequest(tmp, resp -> { + completeModify(tmp, resp); + cb.accept(resp); + }, enqueuedTicks); + break; + case THREE_PHASE: + tmp = commitRequest(true); + enqueueRequest(tmp, resp -> { + recordSuccessfulRequest(tmp); + cb.accept(resp); + }, enqueuedTicks); + break; + case READY: + tmp = readyRequest(); + enqueueRequest(tmp, resp -> { + recordSuccessfulRequest(tmp); + cb.accept(resp); + }, enqueuedTicks); + break; + default: + throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get()); + } + } + } }