X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FAbstractProxyTransaction.java;h=e1919d1bb6c2a601486005425c5e5f6fe75423b4;hp=07b89e09230949da6c4849b3fb5dc03d4c3c36d8;hb=127042ea7e148d9dc0282acc3780b4754ca69e12;hpb=18ddbfdc55a1faddf7aeb2df6b25481d34c820ab diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java index 07b89e0923..e1919d1bb6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java @@ -9,29 +9,29 @@ package org.opendaylight.controller.cluster.databroker.actors.dds; import akka.actor.ActorRef; import com.google.common.base.MoreObjects; -import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; import com.google.common.base.Verify; import com.google.common.collect.Iterables; -import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.FluentFuture; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import java.util.ArrayDeque; import java.util.Deque; import java.util.Iterator; +import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.Consumer; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.NotThreadSafe; +import org.eclipse.jdt.annotation.NonNull; +import org.eclipse.jdt.annotation.Nullable; import org.opendaylight.controller.cluster.access.client.ConnectionEntry; import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ClosedTransactionException; import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest; +import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest; import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest; import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess; import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess; @@ -45,7 +45,6 @@ import org.opendaylight.controller.cluster.access.concepts.Request; import org.opendaylight.controller.cluster.access.concepts.RequestFailure; import org.opendaylight.controller.cluster.access.concepts.Response; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; -import org.opendaylight.mdsal.common.api.ReadFailedException; import org.opendaylight.yangtools.concepts.Identifiable; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; @@ -138,7 +137,7 @@ abstract class AbstractProxyTransaction implements Identifiable exists(final YangInstanceIdentifier path) { + final FluentFuture exists(final YangInstanceIdentifier path) { checkNotSealed(); return doExists(path); } - final CheckedFuture>, ReadFailedException> read(final YangInstanceIdentifier path) { + final FluentFuture>> read(final YangInstanceIdentifier path) { checkNotSealed(); return doRead(path); } @@ -335,7 +334,7 @@ abstract class AbstractProxyTransaction implements Identifiable optState = flushState(); + if (optState.isPresent()) { + forwardToSuccessor(successor, optState.get(), null); + } successor.predecessorSealed(); } private void predecessorSealed() { - if (markSealed() && !sealAndSend(Optional.absent())) { + if (markSealed() && !sealAndSend(Optional.empty())) { sealSuccessor(); } } - void sealOnly() { - parent.onTransactionSealed(this); - final boolean success = STATE_UPDATER.compareAndSet(this, OPEN, SEALED); - Verify.verify(success, "Attempted to replay seal on {}", this); + /** + * Seal this transaction. If this method reports false, the caller needs to deal with propagating the seal operation + * towards the successor. + * + * @return True if seal operation was successful, false if this proxy has a successor. + */ + boolean sealOnly() { + return sealState(); } /** @@ -375,8 +381,11 @@ abstract class AbstractProxyTransaction implements Identifiable enqueuedTicks) { - parent.onTransactionSealed(this); + return sealState(); + } + private boolean sealState() { + parent.onTransactionSealed(this); // Transition internal state to sealed and detect presence of a successor return STATE_UPDATER.compareAndSet(this, OPEN, SEALED); } @@ -410,7 +419,7 @@ abstract class AbstractProxyTransaction implements Identifiable req) { + final void recordSuccessfulRequest(final @NonNull TransactionRequest req) { successfulRequests.add(Verify.verifyNotNull(req)); } @@ -446,7 +455,7 @@ abstract class AbstractProxyTransaction implements Identifiable) t).getCause().unwrap()); } else { - ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass())); + ret.voteNo(unhandledResponseException(t)); } // This is a terminal request, hence we do not need to record it @@ -504,7 +513,7 @@ abstract class AbstractProxyTransaction implements Identifiable) t).getCause().unwrap()); } else { - ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass())); + ret.voteNo(unhandledResponseException(t)); } recordSuccessfulRequest(req); @@ -566,7 +575,7 @@ abstract class AbstractProxyTransaction implements Identifiable) t).getCause().unwrap()); } else { - ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass())); + ret.voteNo(unhandledResponseException(t)); } onPreCommitComplete(req); @@ -599,7 +608,7 @@ abstract class AbstractProxyTransaction implements Identifiable) t).getCause().unwrap()); } else { - ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass())); + ret.voteNo(unhandledResponseException(t)); } LOG.debug("Transaction {} doCommit completed", this); @@ -684,13 +693,13 @@ abstract class AbstractProxyTransaction implements Identifiable) obj, resp -> { }, now); + successor.doReplayRequest((TransactionRequest) obj, resp -> { /*NOOP*/ }, now); } else { Verify.verify(obj instanceof IncrementSequence); final IncrementSequence increment = (IncrementSequence) obj; successor.doReplayRequest(new IncrementTransactionSequenceRequest(getIdentifier(), - increment.getSequence(), localActor(), isSnapshotOnly(), increment.getDelta()), resp -> { }, - now); + increment.getSequence(), localActor(), isSnapshotOnly(), + increment.getDelta()), resp -> { /*NOOP*/ }, now); LOG.debug("Incrementing sequence {} to successor {}", obj, successor); } } @@ -719,9 +728,13 @@ abstract class AbstractProxyTransaction implements Identifiable optState = flushState(); + if (optState.isPresent()) { + successor.handleReplayedRemoteRequest(optState.get(), null, enqueuedTicks); + } if (successor.markSealed()) { - successor.sealAndSend(Optional.of(parent.currentTime())); + successor.sealAndSend(Optional.of(enqueuedTicks)); } } } @@ -790,12 +803,12 @@ abstract class AbstractProxyTransaction implements Identifiable data); - abstract CheckedFuture doExists(YangInstanceIdentifier path); + abstract FluentFuture doExists(YangInstanceIdentifier path); - abstract CheckedFuture>, ReadFailedException> doRead(YangInstanceIdentifier path); + abstract FluentFuture>> doRead(YangInstanceIdentifier path); @GuardedBy("this") - abstract void flushState(AbstractProxyTransaction successor); + abstract Optional flushState(); abstract TransactionRequest abortRequest(); @@ -839,6 +852,10 @@ abstract class AbstractProxyTransaction implements Identifiable request, @Nullable Consumer> callback, long enqueuedTicks); + private static IllegalStateException unhandledResponseException(Response resp) { + return new IllegalStateException("Unhandled response " + resp.getClass()); + } + @Override public final String toString() { return MoreObjects.toStringHelper(this).add("identifier", getIdentifier()).add("state", state).toString();