X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FLocalProxyTransaction.java;h=0a5ead9d9bca1c049c98c3665caeeac5f3ee9a6a;hp=3869e66d3e90141b9390491de9c5c20afbd60a91;hb=94c5a97a4c7e1c2374b88327de205ad91088104e;hpb=32b322afd58f120a78208c939a01422aa224d0cf diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java index 3869e66d3e..0a5ead9d9b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java @@ -7,42 +7,39 @@ */ package org.opendaylight.controller.cluster.databroker.actors.dds; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import com.google.common.base.Verify; -import com.google.common.util.concurrent.CheckedFuture; -import com.google.common.util.concurrent.Futures; +import static java.util.Objects.requireNonNull; + +import com.google.common.util.concurrent.FluentFuture; +import java.util.Optional; import java.util.function.Consumer; -import javax.annotation.Nullable; -import javax.annotation.concurrent.NotThreadSafe; +import org.eclipse.jdt.annotation.NonNull; +import org.eclipse.jdt.annotation.Nullable; import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess; +import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest; import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest; -import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol; -import org.opendaylight.controller.cluster.access.commands.TransactionDelete; -import org.opendaylight.controller.cluster.access.commands.TransactionMerge; -import org.opendaylight.controller.cluster.access.commands.TransactionModification; +import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess; +import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest; import org.opendaylight.controller.cluster.access.commands.TransactionRequest; -import org.opendaylight.controller.cluster.access.commands.TransactionWrite; -import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.Response; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.util.AbstractDataTreeModificationCursor; -import org.opendaylight.mdsal.common.api.ReadFailedException; +import org.opendaylight.yangtools.util.concurrent.FluentFutures; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeModification; -import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeSnapshot; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModificationCursor; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * An {@link AbstractProxyTransaction} for dispatching a transaction towards a shard leader which is co-located with - * the client instance. + * the client instance. This class is NOT thread-safe. * *

* It requires a {@link DataTreeSnapshot}, which is used to instantiated a new {@link DataTreeModification}. Operations @@ -55,136 +52,123 @@ import org.slf4j.LoggerFactory; * * @author Robert Varga */ -@NotThreadSafe -final class LocalProxyTransaction extends AbstractProxyTransaction { +abstract class LocalProxyTransaction extends AbstractProxyTransaction { private static final Logger LOG = LoggerFactory.getLogger(LocalProxyTransaction.class); private final TransactionIdentifier identifier; - private CursorAwareDataTreeModification modification; - private CursorAwareDataTreeSnapshot sealedModification; - - LocalProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier, - final CursorAwareDataTreeModification modification) { - super(parent); - this.identifier = Preconditions.checkNotNull(identifier); - this.modification = Preconditions.checkNotNull(modification); + LocalProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier, final boolean isDone) { + super(parent, isDone); + this.identifier = requireNonNull(identifier); } @Override - public TransactionIdentifier getIdentifier() { + public final TransactionIdentifier getIdentifier() { return identifier; } - @Override - void doDelete(final YangInstanceIdentifier path) { - modification.delete(path); - } + abstract @NonNull DataTreeSnapshot readOnlyView(); - @Override - void doMerge(final YangInstanceIdentifier path, final NormalizedNode data) { - modification.merge(path, data); - } - - @Override - void doWrite(final YangInstanceIdentifier path, final NormalizedNode data) { - modification.write(path, data); - } + abstract void applyForwardedModifyTransactionRequest(ModifyTransactionRequest request, + @Nullable Consumer> callback); - @Override - CheckedFuture doExists(final YangInstanceIdentifier path) { - return Futures.immediateCheckedFuture(modification.readNode(path).isPresent()); - } + abstract void replayModifyTransactionRequest(ModifyTransactionRequest request, + @Nullable Consumer> callback, long enqueuedTicks); @Override - CheckedFuture>, ReadFailedException> doRead(final YangInstanceIdentifier path) { - return Futures.immediateCheckedFuture(modification.readNode(path)); - } - - private RuntimeException abortedException() { - return new IllegalStateException("Tracker " + identifier + " has been aborted"); - } - - private RuntimeException submittedException() { - return new IllegalStateException("Tracker " + identifier + " has been submitted"); + final FluentFuture doExists(final YangInstanceIdentifier path) { + return FluentFutures.immediateFluentFuture(readOnlyView().readNode(path).isPresent()); } @Override - void doAbort() { - sendAbort(new AbortLocalTransactionRequest(identifier, localActor()), response -> { - LOG.debug("Transaction {} abort completed with {}", identifier, response); - }); + final FluentFuture> doRead(final YangInstanceIdentifier path) { + return FluentFutures.immediateFluentFuture(readOnlyView().readNode(path)); } @Override - CommitLocalTransactionRequest commitRequest(final boolean coordinated) { - final CommitLocalTransactionRequest ret = new CommitLocalTransactionRequest(identifier, localActor(), - modification, coordinated); - modification = new FailedDataTreeModification(this::submittedException); - return ret; + final AbortLocalTransactionRequest abortRequest() { + return new AbortLocalTransactionRequest(identifier, localActor()); } @Override - void doSeal() { - modification.ready(); - sealedModification = modification; - } - - DataTreeSnapshot getSnapshot() { - Preconditions.checkState(sealedModification != null, "Proxy %s is not sealed yet", identifier); - return sealedModification; + void handleReplayedLocalRequest(final AbstractLocalTransactionRequest request, + final Consumer> callback, final long enqueuedTicks) { + if (request instanceof AbortLocalTransactionRequest) { + enqueueAbort(request, callback, enqueuedTicks); + } else { + throw new IllegalArgumentException("Unhandled request" + request); + } } - private void applyModifyTransactionRequest(final ModifyTransactionRequest request, - final @Nullable Consumer> callback) { - for (TransactionModification mod : request.getModifications()) { - if (mod instanceof TransactionWrite) { - modification.write(mod.getPath(), ((TransactionWrite)mod).getData()); - } else if (mod instanceof TransactionMerge) { - modification.merge(mod.getPath(), ((TransactionMerge)mod).getData()); - } else if (mod instanceof TransactionDelete) { - modification.delete(mod.getPath()); - } else { - throw new IllegalArgumentException("Unsupported modification " + mod); + private boolean handleReadRequest(final TransactionRequest request, final Consumer> callback) { + // Note we delay completion of read requests to limit the scope at which the client can run, as they have + // listeners, which we do not want to execute while we are reconnecting. + if (request instanceof ReadTransactionRequest) { + final YangInstanceIdentifier path = ((ReadTransactionRequest) request).getPath(); + final Optional result = readOnlyView().readNode(path); + if (callback != null) { + // XXX: FB does not see that callback is final, on stack and has be check for non-null. + final Consumer> fbIsStupid = requireNonNull(callback); + executeInActor(() -> fbIsStupid.accept(new ReadTransactionSuccess(request.getTarget(), + request.getSequence(), result))); } - } - - final java.util.Optional maybeProtocol = request.getPersistenceProtocol(); - if (maybeProtocol.isPresent()) { - seal(); - Verify.verify(callback != null, "Request {} has null callback", request); - - switch (maybeProtocol.get()) { - case ABORT: - sendAbort(callback); - break; - case SIMPLE: - sendRequest(commitRequest(false), callback); - break; - case THREE_PHASE: - sendRequest(commitRequest(true), callback); - break; - default: - throw new IllegalArgumentException("Unhandled protocol " + maybeProtocol.get()); + return true; + } else if (request instanceof ExistsTransactionRequest) { + final YangInstanceIdentifier path = ((ExistsTransactionRequest) request).getPath(); + final boolean result = readOnlyView().readNode(path).isPresent(); + if (callback != null) { + // XXX: FB does not see that callback is final, on stack and has be check for non-null. + final Consumer> fbIsStupid = requireNonNull(callback); + executeInActor(() -> fbIsStupid.accept(new ExistsTransactionSuccess(request.getTarget(), + request.getSequence(), result))); } + return true; + } else { + return false; } } @Override - void handleForwardedRemoteRequest(final TransactionRequest request, - final @Nullable Consumer> callback) { - LOG.debug("Applying forwaded request {}", request); + void handleReplayedRemoteRequest(final TransactionRequest request, final Consumer> callback, + final long enqueuedTicks) { + if (request instanceof ModifyTransactionRequest) { + replayModifyTransactionRequest((ModifyTransactionRequest) request, callback, enqueuedTicks); + } else if (handleReadRequest(request, callback)) { + // No-op + } else if (request instanceof TransactionPurgeRequest) { + enqueuePurge(callback, enqueuedTicks); + } else if (request instanceof IncrementTransactionSequenceRequest) { + // Local transactions do not have non-replayable requests which would be visible to the backend, + // hence we can skip sequence increments. + LOG.debug("Not replaying {}", request); + } else { + throw new IllegalArgumentException("Unhandled request " + request); + } + } + /** + * Remote-to-local equivalent of {@link #handleReplayedRemoteRequest(TransactionRequest, Consumer, long)}, + * except it is invoked in the forwarding path from + * {@link RemoteProxyTransaction#forwardToLocal(LocalProxyTransaction, TransactionRequest, Consumer)}. + * + * @param request Forwarded request + * @param callback Callback to be invoked once the request completes + */ + void handleForwardedRemoteRequest(final TransactionRequest request, final Consumer> callback) { if (request instanceof ModifyTransactionRequest) { - applyModifyTransactionRequest((ModifyTransactionRequest) request, callback); + applyForwardedModifyTransactionRequest((ModifyTransactionRequest) request, callback); + } else if (handleReadRequest(request, callback)) { + // No-op + } else if (request instanceof TransactionPurgeRequest) { + enqueuePurge(callback); } else { throw new IllegalArgumentException("Unhandled request " + request); } } @Override - void forwardToRemote(final RemoteProxyTransaction successor, final TransactionRequest request, - final Consumer> callback) throws RequestException { + final void forwardToRemote(final RemoteProxyTransaction successor, final TransactionRequest request, + final Consumer> callback) { if (request instanceof CommitLocalTransactionRequest) { final CommitLocalTransactionRequest req = (CommitLocalTransactionRequest) request; final DataTreeModification mod = req.getModification(); @@ -192,12 +176,12 @@ final class LocalProxyTransaction extends AbstractProxyTransaction { LOG.debug("Applying modification {} to successor {}", mod, successor); mod.applyToCursor(new AbstractDataTreeModificationCursor() { @Override - public void write(final PathArgument child, final NormalizedNode data) { + public void write(final PathArgument child, final NormalizedNode data) { successor.write(current().node(child), data); } @Override - public void merge(final PathArgument child, final NormalizedNode data) { + public void merge(final PathArgument child, final NormalizedNode data) { successor.merge(current().node(child), data); } @@ -207,44 +191,46 @@ final class LocalProxyTransaction extends AbstractProxyTransaction { } }); - successor.seal(); - + successor.sealOnly(); final ModifyTransactionRequest successorReq = successor.commitRequest(req.isCoordinated()); successor.sendRequest(successorReq, callback); } else if (request instanceof AbortLocalTransactionRequest) { LOG.debug("Forwarding abort {} to successor {}", request, successor); successor.abort(); + } else if (request instanceof TransactionPurgeRequest) { + LOG.debug("Forwarding purge {} to successor {}", request, successor); + successor.enqueuePurge(callback); + } else if (request instanceof ModifyTransactionRequest) { + successor.handleForwardedRequest(request, callback); } else { - throw new IllegalArgumentException("Unhandled request" + request); + throwUnhandledRequest(request); } } @Override void forwardToLocal(final LocalProxyTransaction successor, final TransactionRequest request, - final Consumer> callback) throws RequestException { + final Consumer> callback) { if (request instanceof AbortLocalTransactionRequest) { successor.sendAbort(request, callback); - } else if (request instanceof CommitLocalTransactionRequest) { - successor.sendCommit((CommitLocalTransactionRequest)request, callback); + } else if (request instanceof TransactionPurgeRequest) { + successor.enqueuePurge(callback); } else { - throw new IllegalArgumentException("Unhandled request" + request); + throwUnhandledRequest(request); } LOG.debug("Forwarded request {} to successor {}", request, successor); } - private void sendAbort(final TransactionRequest request, final Consumer> callback) { - sendRequest(request, callback); - modification = new FailedDataTreeModification(this::abortedException); + private static void throwUnhandledRequest(final TransactionRequest request) { + throw new IllegalArgumentException("Unhandled request " + request); } - private void sendCommit(final CommitLocalTransactionRequest request, final Consumer> callback) { - // Rebase old modification on new data tree. - try (DataTreeModificationCursor cursor = modification.createCursor(YangInstanceIdentifier.EMPTY)) { - request.getModification().applyToCursor(cursor); - } + void sendAbort(final TransactionRequest request, final Consumer> callback) { + sendRequest(request, callback); + } - seal(); - sendRequest(commitRequest(request.isCoordinated()), callback); + void enqueueAbort(final TransactionRequest request, final Consumer> callback, + final long enqueuedTicks) { + enqueueRequest(request, callback, enqueuedTicks); } }