X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FAbstractProxyTransaction.java;h=14ad54699161a60719ea846d353c46e73fb4adea;hb=HEAD;hp=84546632f099bb3302938f6d0e8990c3f93e5e3b;hpb=25ec94fd44c0912d6a00afdcf83110d8d0fb0236;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java index 84546632f0..14ad546991 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java @@ -7,31 +7,34 @@ */ package org.opendaylight.controller.cluster.databroker.actors.dds; +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Verify.verify; +import static com.google.common.base.Verify.verifyNotNull; +import static java.util.Objects.requireNonNull; + import akka.actor.ActorRef; import com.google.common.base.MoreObjects; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; -import com.google.common.base.Verify; import com.google.common.collect.Iterables; -import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.FluentFuture; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import java.util.ArrayDeque; import java.util.Deque; import java.util.Iterator; +import java.util.Optional; +import java.util.OptionalLong; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.Consumer; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; -import javax.annotation.concurrent.GuardedBy; -import javax.annotation.concurrent.NotThreadSafe; +import org.checkerframework.checker.lock.qual.GuardedBy; +import org.eclipse.jdt.annotation.NonNull; +import org.eclipse.jdt.annotation.Nullable; import org.opendaylight.controller.cluster.access.client.ConnectionEntry; import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ClosedTransactionException; import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest; +import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest; import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest; import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess; import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess; @@ -45,8 +48,8 @@ import org.opendaylight.controller.cluster.access.concepts.Request; import org.opendaylight.controller.cluster.access.concepts.RequestFailure; import org.opendaylight.controller.cluster.access.concepts.Response; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; -import org.opendaylight.mdsal.common.api.ReadFailedException; import org.opendaylight.yangtools.concepts.Identifiable; +import org.opendaylight.yangtools.yang.common.Empty; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.slf4j.Logger; @@ -62,15 +65,13 @@ import org.slf4j.LoggerFactory; *

* This class interacts with the queueing mechanism in ClientActorBehavior, hence once we arrive at a decision * to use either a local or remote implementation, we are stuck with it. We can re-evaluate on the next transaction. - * - * @author Robert Varga */ -abstract class AbstractProxyTransaction implements Identifiable { +abstract sealed class AbstractProxyTransaction implements Identifiable + permits LocalProxyTransaction, RemoteProxyTransaction { /** * Marker object used instead of read-type of requests, which are satisfied only once. This has a lower footprint - * and allows compressing multiple requests into a single entry. + * and allows compressing multiple requests into a single entry. This class is not thread-safe. */ - @NotThreadSafe private static final class IncrementSequence { private final long sequence; private long delta = 0; @@ -100,7 +101,7 @@ abstract class AbstractProxyTransaction implements Identifiable data) { + final void merge(final YangInstanceIdentifier path, final NormalizedNode data) { checkReadWrite(); checkNotSealed(); doMerge(path, data); } - final void write(final YangInstanceIdentifier path, final NormalizedNode data) { + final void write(final YangInstanceIdentifier path, final NormalizedNode data) { checkReadWrite(); checkNotSealed(); doWrite(path, data); } - final CheckedFuture exists(final YangInstanceIdentifier path) { + final FluentFuture exists(final YangInstanceIdentifier path) { checkNotSealed(); return doExists(path); } - final CheckedFuture>, ReadFailedException> read(final YangInstanceIdentifier path) { + final FluentFuture> read(final YangInstanceIdentifier path) { checkNotSealed(); return doRead(path); } @@ -327,49 +328,90 @@ abstract class AbstractProxyTransaction implements Identifiable optState = flushState(); + if (optState.isPresent()) { + forwardToSuccessor(successor, optState.orElseThrow(), null); } + successor.predecessorSealed(); } - private void internalSeal() { - doSeal(); - parent.onTransactionSealed(this); + private void predecessorSealed() { + if (markSealed() && !sealAndSend(OptionalLong.empty())) { + sealSuccessor(); + } + } - // Now deal with state transfer, which can occur via successor or a follow-up canCommit() or directCommit(). - if (!STATE_UPDATER.compareAndSet(this, OPEN, SEALED)) { - // Slow path: wait for the successor to complete - final AbstractProxyTransaction successor = awaitSuccessor(); + /** + * Seal this transaction. If this method reports false, the caller needs to deal with propagating the seal operation + * towards the successor. + * + * @return True if seal operation was successful, false if this proxy has a successor. + */ + boolean sealOnly() { + return sealState(); + } - // At this point the successor has completed transition and is possibly visible by the user thread, which is - // still stuck here. The successor has not seen final part of our state, nor the fact it is sealed. - // Propagate state and seal the successor. - flushState(successor); - successor.ensureSealed(); - } + /** + * Seal this transaction and potentially send it out towards the backend. If this method reports false, the caller + * needs to deal with propagating the seal operation towards the successor. + * + * @param enqueuedTicks Enqueue ticks when this is invoked from replay path. + * @return True if seal operation was successful, false if this proxy has a successor. + */ + boolean sealAndSend(final OptionalLong enqueuedTicks) { + return sealState(); + } + + private boolean sealState() { + parent.onTransactionSealed(this); + // Transition internal state to sealed and detect presence of a successor + return STATE_UPDATER.compareAndSet(this, OPEN, SEALED); + } + + /** + * Mark this proxy as having been sealed. + * + * @return True if this call has transitioned to sealed state. + */ + final boolean markSealed() { + return SEALED_UPDATER.compareAndSet(this, 0, 1); } private void checkNotSealed() { - Preconditions.checkState(sealed == 0, "Transaction %s has already been sealed", getIdentifier()); + checkState(sealed == 0, "Transaction %s has already been sealed", getIdentifier()); } private void checkSealed() { - Preconditions.checkState(sealed != 0, "Transaction %s has not been sealed yet", getIdentifier()); + checkState(sealed != 0, "Transaction %s has not been sealed yet", getIdentifier()); } private SuccessorState getSuccessorState() { final State local = state; - Verify.verify(local instanceof SuccessorState, "State %s has unexpected class", local); + verify(local instanceof SuccessorState, "State %s has unexpected class", local); return (SuccessorState) local; } @@ -379,8 +421,8 @@ abstract class AbstractProxyTransaction implements Identifiable req) { - successfulRequests.add(Verify.verifyNotNull(req)); + final void recordSuccessfulRequest(final @NonNull TransactionRequest req) { + successfulRequests.add(verifyNotNull(req)); } final void recordFinishedRequest(final Response response) { @@ -406,7 +448,7 @@ abstract class AbstractProxyTransaction implements Identifiable ret) { + final void abort(final VotingFuture ret) { checkSealed(); sendDoAbort(t -> { @@ -415,7 +457,7 @@ abstract class AbstractProxyTransaction implements Identifiable) t).getCause().unwrap()); } else { - ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass())); + ret.voteNo(unhandledResponseException(t)); } // This is a terminal request, hence we do not need to record it @@ -460,7 +502,7 @@ abstract class AbstractProxyTransaction implements Identifiable ret = SettableFuture.create(); - sendRequest(Verify.verifyNotNull(commitRequest(false)), t -> { + sendRequest(verifyNotNull(commitRequest(false)), t -> { if (t instanceof TransactionCommitSuccess) { ret.set(Boolean.TRUE); } else if (t instanceof RequestFailure) { @@ -473,7 +515,7 @@ abstract class AbstractProxyTransaction implements Identifiable req = Verify.verifyNotNull(commitRequest(true)); + final TransactionRequest req = verifyNotNull(commitRequest(true)); sendRequest(req, t -> { if (t instanceof TransactionCanCommitSuccess) { @@ -504,7 +546,7 @@ abstract class AbstractProxyTransaction implements Identifiable) t).getCause().unwrap()); } else { - ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass())); + ret.voteNo(unhandledResponseException(t)); } recordSuccessfulRequest(req); @@ -535,7 +577,7 @@ abstract class AbstractProxyTransaction implements Identifiable) t).getCause().unwrap()); } else { - ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass())); + ret.voteNo(unhandledResponseException(t)); } onPreCommitComplete(req); @@ -568,7 +610,7 @@ abstract class AbstractProxyTransaction implements Identifiable) t).getCause().unwrap()); } else { - ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass())); + ret.voteNo(unhandledResponseException(t)); } LOG.debug("Transaction {} doCommit completed", this); @@ -622,7 +664,7 @@ abstract class AbstractProxyTransaction implements Identifiable) obj, resp -> { }, now); + successor.doReplayRequest((TransactionRequest) obj, resp -> { /*NOOP*/ }, now); } else { - Verify.verify(obj instanceof IncrementSequence); + verify(obj instanceof IncrementSequence); final IncrementSequence increment = (IncrementSequence) obj; successor.doReplayRequest(new IncrementTransactionSequenceRequest(getIdentifier(), - increment.getSequence(), localActor(), isSnapshotOnly(), increment.getDelta()), resp -> { }, - now); + increment.getSequence(), localActor(), isSnapshotOnly(), + increment.getDelta()), resp -> { /*NOOP*/ }, now); LOG.debug("Incrementing sequence {} to successor {}", obj, successor); } } @@ -674,7 +716,7 @@ abstract class AbstractProxyTransaction implements Identifiable req = e.getRequest(); if (getIdentifier().equals(req.getTarget())) { - Verify.verify(req instanceof TransactionRequest, "Unhandled request %s", req); + verify(req instanceof TransactionRequest, "Unhandled request %s", req); LOG.debug("Replaying queued request {} to successor {}", req, successor); successor.doReplayRequest((TransactionRequest) req, e.getCallback(), e.getEnqueuedTicks()); it.remove(); @@ -688,8 +730,14 @@ abstract class AbstractProxyTransaction implements Identifiable optState = flushState(); + if (optState.isPresent()) { + successor.handleReplayedRemoteRequest(optState.orElseThrow(), null, enqueuedTicks); + } + if (successor.markSealed()) { + successor.sealAndSend(OptionalLong.of(enqueuedTicks)); + } } } @@ -753,18 +801,16 @@ abstract class AbstractProxyTransaction implements Identifiable data); - - abstract void doWrite(YangInstanceIdentifier path, NormalizedNode data); + abstract void doMerge(YangInstanceIdentifier path, NormalizedNode data); - abstract CheckedFuture doExists(YangInstanceIdentifier path); + abstract void doWrite(YangInstanceIdentifier path, NormalizedNode data); - abstract CheckedFuture>, ReadFailedException> doRead(YangInstanceIdentifier path); + abstract FluentFuture doExists(YangInstanceIdentifier path); - abstract void doSeal(); + abstract FluentFuture> doRead(YangInstanceIdentifier path); @GuardedBy("this") - abstract void flushState(AbstractProxyTransaction successor); + abstract Optional flushState(); abstract TransactionRequest abortRequest(); @@ -808,6 +854,14 @@ abstract class AbstractProxyTransaction implements Identifiable request, @Nullable Consumer> callback, long enqueuedTicks); + static final @NonNull IllegalArgumentException unhandledRequest(final TransactionRequest request) { + return new IllegalArgumentException("Unhandled request " + request); + } + + private static @NonNull IllegalStateException unhandledResponseException(final Response resp) { + return new IllegalStateException("Unhandled response " + resp.getClass()); + } + @Override public final String toString() { return MoreObjects.toStringHelper(this).add("identifier", getIdentifier()).add("state", state).toString();