X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FRemoteProxyTransaction.java;h=347c7eac033ae1bf4a63db2f3d4ba0d26b981d6c;hp=26d718def5d267381f1ae1b5a0879fce7a2ca6e6;hb=320a4e5cd2d9d80468a3f82798744f2035488218;hpb=5fd8e6506248cc34da72281a1662612f6c2b2f9a diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java index 26d718def5..347c7eac03 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java @@ -8,27 +8,34 @@ package org.opendaylight.controller.cluster.databroker.actors.dds; import com.google.common.base.Optional; +import com.google.common.base.Preconditions; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; +import java.util.ArrayList; +import java.util.Collection; import java.util.function.Consumer; +import javax.annotation.Nullable; import org.opendaylight.controller.cluster.access.commands.AbstractReadTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess; import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequestBuilder; +import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol; import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess; import org.opendaylight.controller.cluster.access.commands.TransactionDelete; import org.opendaylight.controller.cluster.access.commands.TransactionMerge; import org.opendaylight.controller.cluster.access.commands.TransactionModification; +import org.opendaylight.controller.cluster.access.commands.TransactionRequest; import org.opendaylight.controller.cluster.access.commands.TransactionSuccess; import org.opendaylight.controller.cluster.access.commands.TransactionWrite; +import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.RequestFailure; import org.opendaylight.controller.cluster.access.concepts.Response; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; -import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.mdsal.common.api.ReadFailedException; import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; @@ -55,15 +62,15 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { // FIXME: make this tuneable private static final int REQUEST_MAX_MODIFICATIONS = 1000; + private final Collection> successfulRequests = new ArrayList<>(); private final ModifyTransactionRequestBuilder builder; private boolean builderBusy; private volatile Exception operationFailure; - RemoteProxyTransaction(final DistributedDataStoreClientBehavior client, - final TransactionIdentifier identifier) { - super(client); + RemoteProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier) { + super(parent); builder = new ModifyTransactionRequestBuilder(identifier, localActor()); } @@ -136,10 +143,38 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { } private void flushBuilder() { - final ModifyTransactionRequest message = builder.build(); + final ModifyTransactionRequest request = builder.build(); builderBusy = false; - sendRequest(message, this::completeModify); + sendModification(request); + } + + private void sendModification(final TransactionRequest request) { + sendRequest(request, response -> completeModify(request, response)); + } + + @Override + void handleForwardedRemoteRequest(final TransactionRequest request, + final @Nullable Consumer> callback) { + nextSequence(); + + if (callback == null) { + sendModification(request); + return; + } + + /* + * FindBugs is utterly stupid, as it does not recognize the fact that we have checked for null + * and reports NP_PARAMETER_MUST_BE_NONNULL_BUT_MARKED_AS_NULLABLE in the lambda below. + */ + final Consumer> findBugsIsStupid = callback; + + // FIXME: do not use sendRequest() once we have throttling in place, as we have already waited the + // period required to get into the queue. + sendRequest(request, response -> { + findBugsIsStupid.accept(Preconditions.checkNotNull(response)); + completeModify(request, response); + }); } private void appendModification(final TransactionModification modification) { @@ -155,11 +190,12 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { } } - private void completeModify(final Response response) { - LOG.debug("Modification request completed with {}", response); + private void completeModify(final TransactionRequest request, final Response response) { + LOG.debug("Modification request {} completed with {}", request, response); if (response instanceof TransactionSuccess) { - // Happy path no-op + // Happy path + successfulRequests.add(request); } else { recordFailedResponse(response); } @@ -207,7 +243,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { } @Override - ModifyTransactionRequest doCommit(final boolean coordinated) { + ModifyTransactionRequest commitRequest(final boolean coordinated) { ensureInitializedBuider(); builder.setCommit(coordinated); @@ -220,4 +256,57 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { void doSeal() { // No-op } + + @Override + void replaySuccessfulRequests(final AbstractProxyTransaction successor) { + super.replaySuccessfulRequests(successor); + + for (TransactionRequest req : successfulRequests) { + LOG.debug("Forwarding request {} to successor {}", req, successor); + successor.handleForwardedRemoteRequest(req, null); + } + successfulRequests.clear(); + } + + @Override + void forwardToRemote(final RemoteProxyTransaction successor, final TransactionRequest request, + final Consumer> callback) throws RequestException { + successor.handleForwardedRequest(request, callback); + } + + private void handleForwardedRequest(final TransactionRequest request, final Consumer> callback) + throws RequestException { + if (request instanceof ModifyTransactionRequest) { + final ModifyTransactionRequest req = (ModifyTransactionRequest) request; + + req.getModifications().forEach(this::appendModification); + + final java.util.Optional maybeProto = req.getPersistenceProtocol(); + if (maybeProto.isPresent()) { + seal(); + + switch (maybeProto.get()) { + case ABORT: + sendAbort(callback); + break; + case SIMPLE: + sendRequest(commitRequest(false), callback); + break; + case THREE_PHASE: + sendRequest(commitRequest(true), callback); + break; + default: + throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get()); + } + } + } else { + throw new IllegalArgumentException("Unhandled request {}" + request); + } + } + + @Override + void forwardToLocal(final LocalProxyTransaction successor, final TransactionRequest request, + final Consumer> callback) throws RequestException { + successor.handleForwardedRemoteRequest(request, callback); + } }