X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FLocalProxyTransaction.java;h=662ccba85d826c13ec280011a1629f921cc5c74c;hb=refs%2Fchanges%2F55%2F100655%2F2;hp=5b47f22971c9af76298e82032202871bb986b96b;hpb=9cab91f5204c0f55ee269a507269f02d5fe5e90b;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java index 5b47f22971..662ccba85d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java @@ -7,13 +7,13 @@ */ package org.opendaylight.controller.cluster.databroker.actors.dds; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.CheckedFuture; -import com.google.common.util.concurrent.Futures; +import static java.util.Objects.requireNonNull; + +import com.google.common.util.concurrent.FluentFuture; +import java.util.Optional; import java.util.function.Consumer; -import javax.annotation.Nullable; -import javax.annotation.concurrent.NotThreadSafe; +import org.eclipse.jdt.annotation.NonNull; +import org.eclipse.jdt.annotation.Nullable; import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest; @@ -28,18 +28,18 @@ import org.opendaylight.controller.cluster.access.commands.TransactionRequest; import org.opendaylight.controller.cluster.access.concepts.Response; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.util.AbstractDataTreeModificationCursor; -import org.opendaylight.mdsal.common.api.ReadFailedException; +import org.opendaylight.yangtools.util.concurrent.FluentFutures; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; +import org.opendaylight.yangtools.yang.data.tree.api.DataTreeModification; +import org.opendaylight.yangtools.yang.data.tree.api.DataTreeSnapshot; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * An {@link AbstractProxyTransaction} for dispatching a transaction towards a shard leader which is co-located with - * the client instance. + * the client instance. This class is NOT thread-safe. * *

* It requires a {@link DataTreeSnapshot}, which is used to instantiated a new {@link DataTreeModification}. Operations @@ -52,15 +52,14 @@ import org.slf4j.LoggerFactory; * * @author Robert Varga */ -@NotThreadSafe abstract class LocalProxyTransaction extends AbstractProxyTransaction { private static final Logger LOG = LoggerFactory.getLogger(LocalProxyTransaction.class); private final TransactionIdentifier identifier; - LocalProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier) { - super(parent); - this.identifier = Preconditions.checkNotNull(identifier); + LocalProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier, final boolean isDone) { + super(parent, isDone); + this.identifier = requireNonNull(identifier); } @Override @@ -68,28 +67,27 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction { return identifier; } - abstract DataTreeSnapshot readOnlyView(); + abstract @NonNull DataTreeSnapshot readOnlyView(); - abstract void applyModifyTransactionRequest(ModifyTransactionRequest request, + abstract void applyForwardedModifyTransactionRequest(ModifyTransactionRequest request, @Nullable Consumer> callback); abstract void replayModifyTransactionRequest(ModifyTransactionRequest request, @Nullable Consumer> callback, long enqueuedTicks); @Override - final CheckedFuture doExists(final YangInstanceIdentifier path) { - return Futures.immediateCheckedFuture(readOnlyView().readNode(path).isPresent()); + final FluentFuture doExists(final YangInstanceIdentifier path) { + return FluentFutures.immediateFluentFuture(readOnlyView().readNode(path).isPresent()); } @Override - final CheckedFuture>, ReadFailedException> doRead(final YangInstanceIdentifier path) { - return Futures.immediateCheckedFuture(readOnlyView().readNode(path)); + final FluentFuture> doRead(final YangInstanceIdentifier path) { + return FluentFutures.immediateFluentFuture(readOnlyView().readNode(path)); } @Override - final void doAbort() { - sendAbort(new AbortLocalTransactionRequest(identifier, localActor()), - response -> LOG.debug("Transaction {} abort completed with {}", identifier, response)); + final AbortLocalTransactionRequest abortRequest() { + return new AbortLocalTransactionRequest(identifier, localActor()); } @Override @@ -102,27 +100,22 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction { } } - private boolean handleReadRequest(final TransactionRequest request, - final @Nullable Consumer> callback) { + private boolean handleReadRequest(final TransactionRequest request, final Consumer> callback) { // Note we delay completion of read requests to limit the scope at which the client can run, as they have // listeners, which we do not want to execute while we are reconnecting. if (request instanceof ReadTransactionRequest) { - final YangInstanceIdentifier path = ((ReadTransactionRequest) request).getPath(); - final Optional> result = readOnlyView().readNode(path); if (callback != null) { - // XXX: FB does not see that callback is final, on stack and has be check for non-null. - final Consumer> fbIsStupid = Preconditions.checkNotNull(callback); - executeInActor(() -> fbIsStupid.accept(new ReadTransactionSuccess(request.getTarget(), + final YangInstanceIdentifier path = ((ReadTransactionRequest) request).getPath(); + final Optional result = readOnlyView().readNode(path); + executeInActor(() -> callback.accept(new ReadTransactionSuccess(request.getTarget(), request.getSequence(), result))); } return true; } else if (request instanceof ExistsTransactionRequest) { - final YangInstanceIdentifier path = ((ExistsTransactionRequest) request).getPath(); - final boolean result = readOnlyView().readNode(path).isPresent(); if (callback != null) { - // XXX: FB does not see that callback is final, on stack and has be check for non-null. - final Consumer> fbIsStupid = Preconditions.checkNotNull(callback); - executeInActor(() -> fbIsStupid.accept(new ExistsTransactionSuccess(request.getTarget(), + final YangInstanceIdentifier path = ((ExistsTransactionRequest) request).getPath(); + final boolean result = readOnlyView().readNode(path).isPresent(); + executeInActor(() -> callback.accept(new ExistsTransactionSuccess(request.getTarget(), request.getSequence(), result))); } return true; @@ -132,14 +125,14 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction { } @Override - void handleReplayedRemoteRequest(final TransactionRequest request, - final @Nullable Consumer> callback, final long enqueuedTicks) { + void handleReplayedRemoteRequest(final TransactionRequest request, final Consumer> callback, + final long enqueuedTicks) { if (request instanceof ModifyTransactionRequest) { replayModifyTransactionRequest((ModifyTransactionRequest) request, callback, enqueuedTicks); } else if (handleReadRequest(request, callback)) { // No-op } else if (request instanceof TransactionPurgeRequest) { - enqueuePurge(enqueuedTicks); + enqueuePurge(callback, enqueuedTicks); } else if (request instanceof IncrementTransactionSequenceRequest) { // Local transactions do not have non-replayable requests which would be visible to the backend, // hence we can skip sequence increments. @@ -159,11 +152,11 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction { */ void handleForwardedRemoteRequest(final TransactionRequest request, final Consumer> callback) { if (request instanceof ModifyTransactionRequest) { - applyModifyTransactionRequest((ModifyTransactionRequest) request, callback); + applyForwardedModifyTransactionRequest((ModifyTransactionRequest) request, callback); } else if (handleReadRequest(request, callback)) { // No-op } else if (request instanceof TransactionPurgeRequest) { - sendPurge(); + enqueuePurge(callback); } else { throw new IllegalArgumentException("Unhandled request " + request); } @@ -179,12 +172,12 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction { LOG.debug("Applying modification {} to successor {}", mod, successor); mod.applyToCursor(new AbstractDataTreeModificationCursor() { @Override - public void write(final PathArgument child, final NormalizedNode data) { + public void write(final PathArgument child, final NormalizedNode data) { successor.write(current().node(child), data); } @Override - public void merge(final PathArgument child, final NormalizedNode data) { + public void merge(final PathArgument child, final NormalizedNode data) { successor.merge(current().node(child), data); } @@ -194,8 +187,7 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction { } }); - successor.ensureSealed(); - + successor.sealOnly(); final ModifyTransactionRequest successorReq = successor.commitRequest(req.isCoordinated()); successor.sendRequest(successorReq, callback); } else if (request instanceof AbortLocalTransactionRequest) { @@ -203,9 +195,11 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction { successor.abort(); } else if (request instanceof TransactionPurgeRequest) { LOG.debug("Forwarding purge {} to successor {}", request, successor); - successor.sendPurge(); + successor.enqueuePurge(callback); + } else if (request instanceof ModifyTransactionRequest) { + successor.handleForwardedRequest(request, callback); } else { - throw new IllegalArgumentException("Unhandled request" + request); + throwUnhandledRequest(request); } } @@ -215,14 +209,18 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction { if (request instanceof AbortLocalTransactionRequest) { successor.sendAbort(request, callback); } else if (request instanceof TransactionPurgeRequest) { - successor.sendPurge(); + successor.enqueuePurge(callback); } else { - throw new IllegalArgumentException("Unhandled request" + request); + throwUnhandledRequest(request); } LOG.debug("Forwarded request {} to successor {}", request, successor); } + private static void throwUnhandledRequest(final TransactionRequest request) { + throw new IllegalArgumentException("Unhandled request " + request); + } + void sendAbort(final TransactionRequest request, final Consumer> callback) { sendRequest(request, callback); }