From: Robert Varga Date: Wed, 30 Nov 2016 14:07:58 +0000 (+0100) Subject: BUG-5280: fix transaction seal atomicity X-Git-Tag: release/carbon~373 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=b5444f8c2c10ded63d6a9e890db61b0f3aa2095e BUG-5280: fix transaction seal atomicity AbstractProxyTransaction.seal() indicates that the user is done with the transaction. This transition needs to be atomically propagated to successors on reconnect, such that the user will always observe sealed proxies. More importantly this state is propagated to parent ProxyHistory, where it drives the state machine in ClientProxyHistory -- and failing to mark the successor as sealed will wreck that. Unfortunately an AbstractProxyTransaction does not forward all of the state on seal(), but rather when the resulting commit cohort initiates commit -- which means we have to perform three-way synchronization between seal()/(can|direct)Commit/finishReconnect, to ensure we flush state towards the backend exactly once. To do that, we guard the methods involved with locking for split them into fast/slow paths and add an explicit flushState() method by which subclasses forward their current unsent state to their successor. This solution is correct but a bit heavy-handed, so it will be further optimized in a follow-up patch. Change-Id: Id5f156dc18faef5b9184c3e2e3d24f7af1b18841 Signed-off-by: Robert Varga --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java index 803908d8c6..7f5bec1ff6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java @@ -10,15 +10,19 @@ package org.opendaylight.controller.cluster.databroker.actors.dds; import akka.actor.ActorRef; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; import com.google.common.base.Verify; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import java.util.ArrayDeque; import java.util.Deque; +import java.util.concurrent.CountDownLatch; import java.util.function.Consumer; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.NotThreadSafe; import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest; import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess; import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess; @@ -52,6 +56,11 @@ import org.slf4j.LoggerFactory; * @author Robert Varga */ abstract class AbstractProxyTransaction implements Identifiable { + /** + * Marker object used instead of read-type of requests, which are satisfied only once. This has a lower footprint + * and allows compressing multiple requests into a single entry. + */ + @NotThreadSafe private static final class IncrementSequence { private long delta = 1; @@ -64,14 +73,49 @@ abstract class AbstractProxyTransaction implements Identifiable successfulRequests = new ArrayDeque<>(); private final ProxyHistory parent; + /* + * Atomic state-keeping is required to synchronize the process of propagating completed transaction state towards + * the backend -- which may include a successor. + * + * Successor, unlike {@link AbstractProxyTransaction#seal()} is triggered from the client actor thread, which means + * the successor placement needs to be atomic with regard to the application thread. + * + * In the common case, the application thread performs performs the seal operations and then "immediately" sends + * the corresponding message. The uncommon case is when the seal and send operations race with a connect completion + * or timeout, when a successor is injected. + * + * This leaves the problem of needing to completely transferring state just after all queued messages are replayed + * after a successor was injected, so that it can be properly sealed if we are racing. + */ + private volatile SealState sealed = SealState.OPEN; + @GuardedBy("this") private AbstractProxyTransaction successor; + @GuardedBy("this") + private CountDownLatch successorLatch; + + // Accessed from user thread only, which may not access this object concurrently private long sequence; - private boolean sealed; + AbstractProxyTransaction(final ProxyHistory parent) { this.parent = Preconditions.checkNotNull(parent); @@ -126,18 +170,50 @@ abstract class AbstractProxyTransaction implements Identifiable req) { @@ -192,45 +268,109 @@ abstract class AbstractProxyTransaction implements Identifiable directCommit() { - checkSealed(); - - final SettableFuture ret = SettableFuture.create(); - sendRequest(Verify.verifyNotNull(commitRequest(false)), t -> { - if (t instanceof TransactionCommitSuccess) { - ret.set(Boolean.TRUE); - } else if (t instanceof RequestFailure) { - ret.setException(((RequestFailure) t).getCause()); - } else { - ret.setException(new IllegalStateException("Unhandled response " + t.getClass())); + final CountDownLatch localLatch; + + synchronized (this) { + final SealState local = checkSealed(); + + // Fast path: no successor asserted + if (successor == null) { + Verify.verify(local == SealState.SEALED); + + final SettableFuture ret = SettableFuture.create(); + sendRequest(Verify.verifyNotNull(commitRequest(false)), t -> { + if (t instanceof TransactionCommitSuccess) { + ret.set(Boolean.TRUE); + } else if (t instanceof RequestFailure) { + ret.setException(((RequestFailure) t).getCause()); + } else { + ret.setException(new IllegalStateException("Unhandled response " + t.getClass())); + } + + // This is a terminal request, hence we do not need to record it + LOG.debug("Transaction {} directCommit completed", this); + parent.completeTransaction(this); + }); + + sealed = SealState.FLUSHED; + return ret; } - // This is a terminal request, hence we do not need to record it - LOG.debug("Transaction {} directCommit completed", this); - parent.completeTransaction(this); - }); - return ret; + // We have a successor, take its latch + localLatch = successorLatch; + } + + // Slow path: we need to wait for the successor to completely propagate + LOG.debug("{} waiting on successor latch", getIdentifier()); + try { + localLatch.await(); + } catch (InterruptedException e) { + LOG.warn("{} interrupted while waiting for latch", getIdentifier()); + throw Throwables.propagate(e); + } + + synchronized (this) { + LOG.debug("{} reacquired lock", getIdentifier()); + + final SealState local = sealed; + Verify.verify(local == SealState.FLUSHED); + + return successor.directCommit(); + } } + final void canCommit(final VotingFuture ret) { + final CountDownLatch localLatch; - void canCommit(final VotingFuture ret) { - checkSealed(); + synchronized (this) { + final SealState local = checkSealed(); - final TransactionRequest req = Verify.verifyNotNull(commitRequest(true)); - sendRequest(req, t -> { - if (t instanceof TransactionCanCommitSuccess) { - ret.voteYes(); - } else if (t instanceof RequestFailure) { - ret.voteNo(((RequestFailure) t).getCause()); - } else { - ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass())); + // Fast path: no successor asserted + if (successor == null) { + Verify.verify(local == SealState.SEALED); + + final TransactionRequest req = Verify.verifyNotNull(commitRequest(true)); + sendRequest(req, t -> { + if (t instanceof TransactionCanCommitSuccess) { + ret.voteYes(); + } else if (t instanceof RequestFailure) { + ret.voteNo(((RequestFailure) t).getCause()); + } else { + ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass())); + } + + recordSuccessfulRequest(req); + LOG.debug("Transaction {} canCommit completed", this); + }); + + sealed = SealState.FLUSHED; + return; } - recordSuccessfulRequest(req); - LOG.debug("Transaction {} canCommit completed", this); - }); + // We have a successor, take its latch + localLatch = successorLatch; + } + + // Slow path: we need to wait for the successor to completely propagate + LOG.debug("{} waiting on successor latch", getIdentifier()); + try { + localLatch.await(); + } catch (InterruptedException e) { + LOG.warn("{} interrupted while waiting for latch", getIdentifier()); + throw Throwables.propagate(e); + } + + synchronized (this) { + LOG.debug("{} reacquired lock", getIdentifier()); + + final SealState local = sealed; + Verify.verify(local == SealState.FLUSHED); + + successor.canCommit(ret); + } } - void preCommit(final VotingFuture ret) { + final void preCommit(final VotingFuture ret) { checkSealed(); final TransactionRequest req = new TransactionPreCommitRequest(getIdentifier(), nextSequence(), @@ -266,7 +406,8 @@ abstract class AbstractProxyTransaction implements Identifiable request, final Consumer> callback) - throws RequestException { + final synchronized void replayRequest(final TransactionRequest request, + final Consumer> callback) { Preconditions.checkState(successor != null, "%s does not have a successor set", this); if (successor instanceof LocalProxyTransaction) { @@ -318,6 +482,9 @@ abstract class AbstractProxyTransaction implements Identifiable commitRequest(boolean coordinated); /** @@ -338,11 +505,11 @@ abstract class AbstractProxyTransaction implements Identifiable request, - Consumer> callback) throws RequestException; + Consumer> callback); /** * Replay a request originating in this proxy to a successor local proxy. */ abstract void forwardToLocal(LocalProxyTransaction successor, TransactionRequest request, - Consumer> callback) throws RequestException; + Consumer> callback); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java index 61f83c2db1..e9941179c7 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java @@ -31,7 +31,6 @@ import org.opendaylight.controller.cluster.access.commands.TransactionModificati import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest; import org.opendaylight.controller.cluster.access.commands.TransactionRequest; import org.opendaylight.controller.cluster.access.commands.TransactionWrite; -import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.Response; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.util.AbstractDataTreeModificationCursor; @@ -40,7 +39,6 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeModification; -import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeSnapshot; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModificationCursor; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; @@ -69,7 +67,7 @@ final class LocalProxyTransaction extends AbstractProxyTransaction { private final TransactionIdentifier identifier; private CursorAwareDataTreeModification modification; - private CursorAwareDataTreeSnapshot sealedModification; + private CursorAwareDataTreeModification sealedModification; LocalProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier, final CursorAwareDataTreeModification modification) { @@ -137,6 +135,26 @@ final class LocalProxyTransaction extends AbstractProxyTransaction { sealedModification = modification; } + @Override + void flushState(final AbstractProxyTransaction successor) { + sealedModification.applyToCursor(new AbstractDataTreeModificationCursor() { + @Override + public void write(final PathArgument child, final NormalizedNode data) { + successor.write(current().node(child), data); + } + + @Override + public void merge(final PathArgument child, final NormalizedNode data) { + successor.merge(current().node(child), data); + } + + @Override + public void delete(final PathArgument child) { + successor.delete(current().node(child)); + } + }); + } + DataTreeSnapshot getSnapshot() { Preconditions.checkState(sealedModification != null, "Proxy %s is not sealed yet", identifier); return sealedModification; @@ -205,7 +223,7 @@ final class LocalProxyTransaction extends AbstractProxyTransaction { @Override void forwardToRemote(final RemoteProxyTransaction successor, final TransactionRequest request, - final Consumer> callback) throws RequestException { + final Consumer> callback) { if (request instanceof CommitLocalTransactionRequest) { final CommitLocalTransactionRequest req = (CommitLocalTransactionRequest) request; final DataTreeModification mod = req.getModification(); @@ -242,7 +260,7 @@ final class LocalProxyTransaction extends AbstractProxyTransaction { @Override void forwardToLocal(final LocalProxyTransaction successor, final TransactionRequest request, - final Consumer> callback) throws RequestException { + final Consumer> callback) { if (request instanceof AbortLocalTransactionRequest) { successor.sendAbort(request, callback); } else if (request instanceof CommitLocalTransactionRequest) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java index 5257c6c7f6..07fcbebad0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java @@ -200,7 +200,7 @@ abstract class ProxyHistory implements Identifiable { LOG.debug("{} creating successor transaction proxy for {}", identifier, t); final AbstractProxyTransaction newProxy = successor.createTransactionProxy(t.getIdentifier()); LOG.debug("{} created successor transaction proxy {}", identifier, newProxy); - t.replaySuccessfulRequests(newProxy); + t.startReconnect(newProxy); } } @@ -208,6 +208,11 @@ abstract class ProxyHistory implements Identifiable { @Override ProxyHistory finishReconnect() { final ProxyHistory ret = Verify.verifyNotNull(successor); + + for (AbstractProxyTransaction t : proxies.values()) { + t.finishReconnect(); + } + LOG.debug("Finished reconnecting proxy history {}", this); lock.unlock(); return ret; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java index 175483855f..783096b7bf 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java @@ -32,7 +32,6 @@ import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitR import org.opendaylight.controller.cluster.access.commands.TransactionRequest; import org.opendaylight.controller.cluster.access.commands.TransactionSuccess; import org.opendaylight.controller.cluster.access.commands.TransactionWrite; -import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.RequestFailure; import org.opendaylight.controller.cluster.access.concepts.Response; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; @@ -261,14 +260,22 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { // No-op } + @Override + void flushState(final AbstractProxyTransaction successor) { + if (builderBusy) { + final ModifyTransactionRequest request = builder.build(); + builderBusy = false; + successor.handleForwardedRemoteRequest(request, null); + } + } + @Override void forwardToRemote(final RemoteProxyTransaction successor, final TransactionRequest request, - final Consumer> callback) throws RequestException { + final Consumer> callback) { successor.handleForwardedRequest(request, callback); } - private void handleForwardedRequest(final TransactionRequest request, final Consumer> callback) - throws RequestException { + private void handleForwardedRequest(final TransactionRequest request, final Consumer> callback) { if (request instanceof ModifyTransactionRequest) { final ModifyTransactionRequest req = (ModifyTransactionRequest) request; @@ -316,7 +323,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { @Override void forwardToLocal(final LocalProxyTransaction successor, final TransactionRequest request, - final Consumer> callback) throws RequestException { + final Consumer> callback) { successor.handleForwardedRemoteRequest(request, callback); } }