X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FRemoteProxyTransaction.java;h=192205dc0a9c0f1df53b8177484f8bf3c0a2282f;hp=9fb1b89580e3557a346f4925e31d39d4d0e86f22;hb=3ee40198347cfb53bd0ce12ffd625cff8ed2383b;hpb=9b4f21460c6dcb10c381df631d064d05de16546c diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java index 9fb1b89580..192205dc0a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java @@ -8,30 +8,44 @@ package org.opendaylight.controller.cluster.databroker.actors.dds; import com.google.common.base.Optional; +import com.google.common.base.Preconditions; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import java.util.function.Consumer; +import javax.annotation.Nullable; +import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.AbstractReadTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess; import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequestBuilder; +import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol; import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess; +import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest; import org.opendaylight.controller.cluster.access.commands.TransactionDelete; +import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest; import org.opendaylight.controller.cluster.access.commands.TransactionMerge; import org.opendaylight.controller.cluster.access.commands.TransactionModification; +import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest; +import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest; +import org.opendaylight.controller.cluster.access.commands.TransactionRequest; import org.opendaylight.controller.cluster.access.commands.TransactionSuccess; import org.opendaylight.controller.cluster.access.commands.TransactionWrite; import org.opendaylight.controller.cluster.access.concepts.RequestFailure; import org.opendaylight.controller.cluster.access.concepts.Response; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; -import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.controller.cluster.datastore.util.AbstractDataTreeModificationCursor; +import org.opendaylight.mdsal.common.api.ReadFailedException; import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,9 +53,11 @@ import org.slf4j.LoggerFactory; * An {@link AbstractProxyTransaction} for dispatching a transaction towards a shard leader whose location is currently * not known or is known to be not co-located with the client. * + *

* It packages operations and sends them via the client actor queue to the shard leader. That queue is responsible for * maintaining any submitted operations until the leader is discovered. * + *

* This class is not safe to access from multiple application threads, as is usual for transactions. Its internal state * transitions based on backend responses are thread-safe. * @@ -54,15 +70,24 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { private static final int REQUEST_MAX_MODIFICATIONS = 1000; private final ModifyTransactionRequestBuilder builder; + private final boolean sendReadyOnSeal; + private final boolean snapshotOnly; private boolean builderBusy; private volatile Exception operationFailure; - RemoteProxyTransaction(final DistributedDataStoreClientBehavior client, - final TransactionIdentifier identifier) { - super(client); - builder = new ModifyTransactionRequestBuilder(identifier, client.self()); + RemoteProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier, + final boolean snapshotOnly, final boolean sendReadyOnSeal) { + super(parent); + this.snapshotOnly = snapshotOnly; + this.sendReadyOnSeal = sendReadyOnSeal; + builder = new ModifyTransactionRequestBuilder(identifier, localActor()); + } + + @Override + boolean isSnapshotOnly() { + return snapshotOnly; } @Override @@ -95,32 +120,32 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { // Make sure we send any modifications before issuing a read ensureFlushedBuider(); - client().sendRequest(request, completer); + sendRequest(request, completer); return MappingCheckedFuture.create(future, ReadFailedException.MAPPER); } @Override CheckedFuture doExists(final YangInstanceIdentifier path) { final SettableFuture future = SettableFuture.create(); - return sendReadRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), client().self(), path), - t -> completeExists(future, t), future); + return sendReadRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(), path, + isSnapshotOnly()), t -> completeExists(future, t), future); } @Override CheckedFuture>, ReadFailedException> doRead(final YangInstanceIdentifier path) { final SettableFuture>> future = SettableFuture.create(); - return sendReadRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), client().self(), path), - t -> completeRead(future, t), future); + return sendReadRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(), path, + isSnapshotOnly()), t -> completeRead(future, t), future); } @Override void doAbort() { - ensureInitializedBuider(); + ensureInitializedBuilder(); builder.setAbort(); flushBuilder(); } - private void ensureInitializedBuider() { + private void ensureInitializedBuilder() { if (!builderBusy) { builder.setSequence(nextSequence()); builderBusy = true; @@ -134,13 +159,78 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { } private void flushBuilder() { - client().sendRequest(builder.build(), this::completeModify); + final ModifyTransactionRequest request = builder.build(); builderBusy = false; + + sendModification(request); + } + + private void sendModification(final TransactionRequest request) { + sendRequest(request, response -> completeModify(request, response)); + } + + @Override + void handleForwardedLocalRequest(final AbstractLocalTransactionRequest request, + final Consumer> callback) { + if (request instanceof CommitLocalTransactionRequest) { + replayLocalCommitRequest((CommitLocalTransactionRequest) request, callback); + } else if (request instanceof AbortLocalTransactionRequest) { + sendRequest(abortRequest(), callback); + } else { + throw new IllegalStateException("Unhandled request " + request); + } + } + + private void replayLocalCommitRequest(final CommitLocalTransactionRequest request, + final Consumer> callback) { + final DataTreeModification mod = request.getModification(); + mod.applyToCursor(new AbstractDataTreeModificationCursor() { + @Override + public void write(final PathArgument child, final NormalizedNode data) { + doWrite(current().node(child), data); + } + + @Override + public void merge(final PathArgument child, final NormalizedNode data) { + doMerge(current().node(child), data); + } + + @Override + public void delete(final PathArgument child) { + doDelete(current().node(child)); + } + }); + + sendRequest(commitRequest(request.isCoordinated()), callback); + } + + @Override + void handleForwardedRemoteRequest(final TransactionRequest request, + final @Nullable Consumer> callback) { + nextSequence(); + + if (callback == null) { + sendModification(request); + return; + } + + /* + * FindBugs is utterly stupid, as it does not recognize the fact that we have checked for null + * and reports NP_PARAMETER_MUST_BE_NONNULL_BUT_MARKED_AS_NULLABLE in the lambda below. + */ + final Consumer> findBugsIsStupid = callback; + + // FIXME: do not use sendRequest() once we have throttling in place, as we have already waited the + // period required to get into the queue. + sendRequest(request, response -> { + findBugsIsStupid.accept(Preconditions.checkNotNull(response)); + completeModify(request, response); + }); } private void appendModification(final TransactionModification modification) { if (operationFailure == null) { - ensureInitializedBuider(); + ensureInitializedBuilder(); builder.addModification(modification); if (builder.size() >= REQUEST_MAX_MODIFICATIONS) { @@ -151,11 +241,12 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { } } - private void completeModify(final Response response) { - LOG.debug("Modification request completed with {}", response); + private void completeModify(final TransactionRequest request, final Response response) { + LOG.debug("Modification request {} completed with {}", request, response); if (response instanceof TransactionSuccess) { - // Happy path no-op + // Happy path + recordSuccessfulRequest(request); } else { recordFailedResponse(response); } @@ -189,9 +280,12 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { } else { failFuture(future, response); } + + recordFinishedRequest(); } - private void completeRead(final SettableFuture>> future, final Response response) { + private void completeRead(final SettableFuture>> future, + final Response response) { LOG.debug("Read request completed with {}", response); if (response instanceof ReadTransactionSuccess) { @@ -199,11 +293,21 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { } else { failFuture(future, response); } + + recordFinishedRequest(); + } + + private ModifyTransactionRequest abortRequest() { + ensureInitializedBuilder(); + builder.setAbort(); + final ModifyTransactionRequest ret = builder.build(); + builderBusy = false; + return ret; } @Override - ModifyTransactionRequest doCommit(final boolean coordinated) { - ensureInitializedBuider(); + ModifyTransactionRequest commitRequest(final boolean coordinated) { + ensureInitializedBuilder(); builder.setCommit(coordinated); final ModifyTransactionRequest ret = builder.build(); @@ -213,6 +317,82 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { @Override void doSeal() { - // No-op + if (sendReadyOnSeal) { + ensureInitializedBuilder(); + builder.setReady(); + flushBuilder(); + } + } + + @Override + void flushState(final AbstractProxyTransaction successor) { + if (builderBusy) { + final ModifyTransactionRequest request = builder.build(); + builderBusy = false; + successor.handleForwardedRemoteRequest(request, null); + } + } + + @Override + void forwardToRemote(final RemoteProxyTransaction successor, final TransactionRequest request, + final Consumer> callback) { + successor.handleForwardedRequest(request, callback); + } + + private void handleForwardedRequest(final TransactionRequest request, final Consumer> callback) { + if (request instanceof ModifyTransactionRequest) { + final ModifyTransactionRequest req = (ModifyTransactionRequest) request; + + req.getModifications().forEach(this::appendModification); + + final java.util.Optional maybeProto = req.getPersistenceProtocol(); + if (maybeProto.isPresent()) { + ensureSealed(); + + switch (maybeProto.get()) { + case ABORT: + sendRequest(abortRequest(), callback); + break; + case SIMPLE: + sendRequest(commitRequest(false), callback); + break; + case THREE_PHASE: + sendRequest(commitRequest(true), callback); + break; + case READY: + //no op + break; + default: + throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get()); + } + } + } else if (request instanceof ReadTransactionRequest) { + ensureFlushedBuider(); + sendRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(), + ((ReadTransactionRequest) request).getPath(), isSnapshotOnly()), callback); + } else if (request instanceof ExistsTransactionRequest) { + ensureFlushedBuider(); + sendRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(), + ((ExistsTransactionRequest) request).getPath(), isSnapshotOnly()), callback); + } else if (request instanceof TransactionPreCommitRequest) { + ensureFlushedBuider(); + sendRequest(new TransactionPreCommitRequest(getIdentifier(), nextSequence(), localActor()), callback); + } else if (request instanceof TransactionDoCommitRequest) { + ensureFlushedBuider(); + sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback); + } else if (request instanceof TransactionAbortRequest) { + ensureFlushedBuider(); + sendAbort(callback); + } else if (request instanceof TransactionPurgeRequest) { + purge(); + } else { + throw new IllegalArgumentException("Unhandled request {}" + request); + } + } + + @Override + void forwardToLocal(final LocalProxyTransaction successor, final TransactionRequest request, + final Consumer> callback) { + successor.handleForwardedRemoteRequest(request, callback); } }