X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FRemoteProxyTransaction.java;h=946e3341fd8f7778fdbf940299deb72112a61284;hb=HEAD;hp=284be4a45793699576e412c88d559d15bc916bba;hpb=2b702880c19e11be077ddcc540aeacd80ecfcaf6;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java index 284be4a457..946e3341fd 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java @@ -7,12 +7,14 @@ */ package org.opendaylight.controller.cluster.databroker.actors.dds; +import static com.google.common.base.Verify.verify; + import com.google.common.util.concurrent.FluentFuture; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import java.util.Optional; +import java.util.OptionalLong; import java.util.function.Consumer; -import javax.annotation.Nullable; import org.opendaylight.controller.cluster.access.client.RequestTimeoutException; import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransactionRequest; @@ -47,7 +49,7 @@ import org.opendaylight.yangtools.util.concurrent.FluentFutures; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; +import org.opendaylight.yangtools.yang.data.tree.api.DataTreeModification; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,18 +64,14 @@ import org.slf4j.LoggerFactory; *

* This class is not safe to access from multiple application threads, as is usual for transactions. Its internal state * transitions based on backend responses are thread-safe. - * - * @author Robert Varga */ final class RemoteProxyTransaction extends AbstractProxyTransaction { private static final Logger LOG = LoggerFactory.getLogger(RemoteProxyTransaction.class); - // FIXME: make this tuneable - private static final int REQUEST_MAX_MODIFICATIONS = 1000; - private final ModifyTransactionRequestBuilder builder; private final boolean sendReadyOnSeal; private final boolean snapshotOnly; + private final int maxModifications; private boolean builderBusy; @@ -85,6 +83,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { this.snapshotOnly = snapshotOnly; this.sendReadyOnSeal = sendReadyOnSeal; builder = new ModifyTransactionRequestBuilder(identifier, localActor()); + maxModifications = parent.parent().actorUtils().getDatastoreContext().getShardBatchedModificationCount(); } @Override @@ -99,17 +98,17 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { @Override void doDelete(final YangInstanceIdentifier path) { - appendModification(new TransactionDelete(path), Optional.empty()); + appendModification(new TransactionDelete(path), OptionalLong.empty()); } @Override - void doMerge(final YangInstanceIdentifier path, final NormalizedNode data) { - appendModification(new TransactionMerge(path, data), Optional.empty()); + void doMerge(final YangInstanceIdentifier path, final NormalizedNode data) { + appendModification(new TransactionMerge(path, data), OptionalLong.empty()); } @Override - void doWrite(final YangInstanceIdentifier path, final NormalizedNode data) { - appendModification(new TransactionWrite(path, data), Optional.empty()); + void doWrite(final YangInstanceIdentifier path, final NormalizedNode data) { + appendModification(new TransactionWrite(path, data), OptionalLong.empty()); } private FluentFuture sendReadRequest(final AbstractReadTransactionRequest request, @@ -135,8 +134,8 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { } @Override - FluentFuture>> doRead(final YangInstanceIdentifier path) { - final SettableFuture>> future = SettableFuture.create(); + FluentFuture> doRead(final YangInstanceIdentifier path) { + final SettableFuture> future = SettableFuture.create(); return sendReadRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(), path, isSnapshotOnly()), t -> completeRead(path, future, t), future); } @@ -149,40 +148,40 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { } private void ensureFlushedBuider() { - ensureFlushedBuider(Optional.empty()); + ensureFlushedBuider(OptionalLong.empty()); } - private void ensureFlushedBuider(final Optional enqueuedTicks) { + private void ensureFlushedBuider(final OptionalLong enqueuedTicks) { if (builderBusy) { flushBuilder(enqueuedTicks); } } - private void flushBuilder(final Optional enqueuedTicks) { + private void flushBuilder(final OptionalLong enqueuedTicks) { final ModifyTransactionRequest request = builder.build(); builderBusy = false; sendModification(request, enqueuedTicks); } - private void sendModification(final TransactionRequest request, final Optional enqueuedTicks) { + private void sendModification(final TransactionRequest request, final OptionalLong enqueuedTicks) { if (enqueuedTicks.isPresent()) { - enqueueRequest(request, response -> completeModify(request, response), enqueuedTicks.get().longValue()); + enqueueRequest(request, response -> completeModify(request, response), enqueuedTicks.orElseThrow()); } else { sendRequest(request, response -> completeModify(request, response)); } } private void appendModification(final TransactionModification modification) { - appendModification(modification, Optional.empty()); + appendModification(modification, OptionalLong.empty()); } - private void appendModification(final TransactionModification modification, final Optional enqueuedTicks) { + private void appendModification(final TransactionModification modification, final OptionalLong enqueuedTicks) { if (operationFailure == null) { ensureInitializedBuilder(); builder.addModification(modification); - if (builder.size() >= REQUEST_MAX_MODIFICATIONS) { + if (builder.size() >= maxModifications) { flushBuilder(enqueuedTicks); } } else { @@ -203,8 +202,8 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { private Exception recordFailedResponse(final Response response) { final Exception failure; - if (response instanceof RequestFailure) { - final RequestException cause = ((RequestFailure) response).getCause(); + if (response instanceof RequestFailure requestFailure) { + final RequestException cause = requestFailure.getCause(); failure = cause instanceof RequestTimeoutException ? new DataStoreUnavailableException(cause.getMessage(), cause) : cause; } else { @@ -228,8 +227,8 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { final Response response) { LOG.debug("Exists request for {} completed with {}", path, response); - if (response instanceof ExistsTransactionSuccess) { - future.set(((ExistsTransactionSuccess) response).getExists()); + if (response instanceof ExistsTransactionSuccess success) { + future.set(success.getExists()); } else { failReadFuture(future, "Error executing exists request for path " + path, response); } @@ -237,12 +236,12 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { recordFinishedRequest(response); } - private void completeRead(final YangInstanceIdentifier path, - final SettableFuture>> future, final Response response) { + private void completeRead(final YangInstanceIdentifier path, final SettableFuture> future, + final Response response) { LOG.debug("Read request for {} completed with {}", path, response); - if (response instanceof ReadTransactionSuccess) { - future.set(((ReadTransactionSuccess) response).getData()); + if (response instanceof ReadTransactionSuccess success) { + future.set(success.getData()); } else { failReadFuture(future, "Error reading data for path " + path, response); } @@ -274,7 +273,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { } @Override - boolean sealAndSend(final Optional enqueuedTicks) { + boolean sealAndSend(final OptionalLong enqueuedTicks) { if (sendReadyOnSeal) { ensureInitializedBuilder(); builder.setReady(); @@ -284,14 +283,14 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { } @Override - java.util.Optional flushState() { + Optional flushState() { if (!builderBusy) { - return java.util.Optional.empty(); + return Optional.empty(); } final ModifyTransactionRequest request = builder.build(); builderBusy = false; - return java.util.Optional.of(request); + return Optional.of(request); } @Override @@ -301,19 +300,19 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { } void handleForwardedRequest(final TransactionRequest request, final Consumer> callback) { - if (request instanceof ModifyTransactionRequest) { - handleForwardedModifyTransactionRequest(callback, (ModifyTransactionRequest) request); - } else if (request instanceof ReadTransactionRequest) { + if (request instanceof ModifyTransactionRequest modifyRequest) { + handleForwardedModifyTransactionRequest(callback, modifyRequest); + } else if (request instanceof ReadTransactionRequest readRequest) { ensureFlushedBuider(); sendRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(), - ((ReadTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> { + readRequest.getPath(), isSnapshotOnly()), resp -> { recordFinishedRequest(resp); callback.accept(resp); }); - } else if (request instanceof ExistsTransactionRequest) { + } else if (request instanceof ExistsTransactionRequest existsRequest) { ensureFlushedBuider(); sendRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(), - ((ExistsTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> { + existsRequest.getPath(), isSnapshotOnly()), resp -> { recordFinishedRequest(resp); callback.accept(resp); }); @@ -334,7 +333,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { } else if (request instanceof TransactionPurgeRequest) { enqueuePurge(callback); } else { - throw new IllegalArgumentException("Unhandled request {}" + request); + throw unhandledRequest(request); } } @@ -342,16 +341,18 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { final ModifyTransactionRequest req) { req.getModifications().forEach(this::appendModification); - final java.util.Optional maybeProto = req.getPersistenceProtocol(); + final Optional maybeProto = req.getPersistenceProtocol(); if (maybeProto.isPresent()) { // Persistence protocol implies we are sealed, propagate the marker, but hold off doing other actions // until we know what we are going to do. if (markSealed()) { - sealOnly(); + if (!sealOnly()) { + LOG.debug("Proxy {} has a successor, which should receive seal through a separate request", this); + } } final TransactionRequest tmp; - switch (maybeProto.get()) { + switch (maybeProto.orElseThrow()) { case ABORT: tmp = abortRequest(); sendRequest(tmp, resp -> { @@ -381,7 +382,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { }); break; default: - throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get()); + throw new IllegalArgumentException("Unhandled protocol " + maybeProto.orElseThrow()); } } } @@ -395,28 +396,28 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { @Override void handleReplayedLocalRequest(final AbstractLocalTransactionRequest request, final Consumer> callback, final long enqueuedTicks) { - if (request instanceof CommitLocalTransactionRequest) { - replayLocalCommitRequest((CommitLocalTransactionRequest) request, callback, enqueuedTicks); + if (request instanceof CommitLocalTransactionRequest commitRequest) { + replayLocalCommitRequest(commitRequest, callback, enqueuedTicks); } else if (request instanceof AbortLocalTransactionRequest) { enqueueRequest(abortRequest(), callback, enqueuedTicks); } else { - throw new IllegalStateException("Unhandled request " + request); + throw unhandledRequest(request); } } private void replayLocalCommitRequest(final CommitLocalTransactionRequest request, final Consumer> callback, final long enqueuedTicks) { final DataTreeModification mod = request.getModification(); - final Optional optTicks = Optional.of(Long.valueOf(enqueuedTicks)); + final OptionalLong optTicks = OptionalLong.of(enqueuedTicks); mod.applyToCursor(new AbstractDataTreeModificationCursor() { @Override - public void write(final PathArgument child, final NormalizedNode data) { + public void write(final PathArgument child, final NormalizedNode data) { appendModification(new TransactionWrite(current().node(child), data), optTicks); } @Override - public void merge(final PathArgument child, final NormalizedNode data) { + public void merge(final PathArgument child, final NormalizedNode data) { appendModification(new TransactionMerge(current().node(child), data), optTicks); } @@ -430,24 +431,24 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { } @Override - void handleReplayedRemoteRequest(final TransactionRequest request, - @Nullable final Consumer> callback, final long enqueuedTicks) { + void handleReplayedRemoteRequest(final TransactionRequest request, final Consumer> callback, + final long enqueuedTicks) { final Consumer> cb = callback != null ? callback : resp -> { /* NOOP */ }; - final Optional optTicks = Optional.of(Long.valueOf(enqueuedTicks)); + final OptionalLong optTicks = OptionalLong.of(enqueuedTicks); - if (request instanceof ModifyTransactionRequest) { - handleReplayedModifyTransactionRequest(enqueuedTicks, cb, (ModifyTransactionRequest) request); - } else if (request instanceof ReadTransactionRequest) { + if (request instanceof ModifyTransactionRequest modifyRequest) { + handleReplayedModifyTransactionRequest(enqueuedTicks, cb, modifyRequest); + } else if (request instanceof ReadTransactionRequest readRequest) { ensureFlushedBuider(optTicks); enqueueRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(), - ((ReadTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> { + readRequest.getPath(), isSnapshotOnly()), resp -> { recordFinishedRequest(resp); cb.accept(resp); }, enqueuedTicks); - } else if (request instanceof ExistsTransactionRequest) { + } else if (request instanceof ExistsTransactionRequest existsRequest) { ensureFlushedBuider(optTicks); enqueueRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(), - ((ExistsTransactionRequest) request).getPath(), isSnapshotOnly()), resp -> { + existsRequest.getPath(), isSnapshotOnly()), resp -> { recordFinishedRequest(resp); cb.accept(resp); }, enqueuedTicks); @@ -468,14 +469,13 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { enqueueDoAbort(callback, enqueuedTicks); } else if (request instanceof TransactionPurgeRequest) { enqueuePurge(callback, enqueuedTicks); - } else if (request instanceof IncrementTransactionSequenceRequest) { - final IncrementTransactionSequenceRequest req = (IncrementTransactionSequenceRequest) request; + } else if (request instanceof IncrementTransactionSequenceRequest req) { ensureFlushedBuider(optTicks); enqueueRequest(new IncrementTransactionSequenceRequest(getIdentifier(), nextSequence(), localActor(), snapshotOnly, req.getIncrement()), callback, enqueuedTicks); incrementSequence(req.getIncrement()); } else { - throw new IllegalArgumentException("Unhandled request {}" + request); + throw unhandledRequest(request); } } @@ -483,16 +483,16 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { final ModifyTransactionRequest req) { req.getModifications().forEach(this::appendModification); - final java.util.Optional maybeProto = req.getPersistenceProtocol(); + final Optional maybeProto = req.getPersistenceProtocol(); if (maybeProto.isPresent()) { // Persistence protocol implies we are sealed, propagate the marker, but hold off doing other actions // until we know what we are going to do. if (markSealed()) { - sealOnly(); + verify(sealOnly(), "Attempted to replay seal on %s", this); } final TransactionRequest tmp; - switch (maybeProto.get()) { + switch (maybeProto.orElseThrow()) { case ABORT: tmp = abortRequest(); enqueueRequest(tmp, resp -> { @@ -522,7 +522,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { }, enqueuedTicks); break; default: - throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get()); + throw new IllegalArgumentException("Unhandled protocol " + maybeProto.orElseThrow()); } } }