X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FAbstractProxyTransaction.java;h=7f5bec1ff6c25dd7dd0650217e327e96a82c9e5c;hb=b5444f8c2c10ded63d6a9e890db61b0f3aa2095e;hp=60919c05a42c2ae76b856e14315a0d1bdcac1653;hpb=98d1c5606bad9633ce5549bcd691a98c75abdf6a;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java index 60919c05a4..7f5bec1ff6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java @@ -7,12 +7,22 @@ */ package org.opendaylight.controller.cluster.databroker.actors.dds; +import akka.actor.ActorRef; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; import com.google.common.base.Verify; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.concurrent.CountDownLatch; +import java.util.function.Consumer; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.NotThreadSafe; import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest; import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess; import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess; @@ -21,105 +31,202 @@ import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRe import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest; import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess; import org.opendaylight.controller.cluster.access.commands.TransactionRequest; -import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; +import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.RequestFailure; +import org.opendaylight.controller.cluster.access.concepts.Response; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; -import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.mdsal.common.api.ReadFailedException; import org.opendaylight.yangtools.concepts.Identifiable; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Class translating transaction operations towards a particular backend shard. * + *

* This class is not safe to access from multiple application threads, as is usual for transactions. Internal state * transitions coming from interactions with backend are expected to be thread-safe. * + *

* This class interacts with the queueing mechanism in ClientActorBehavior, hence once we arrive at a decision * to use either a local or remote implementation, we are stuck with it. We can re-evaluate on the next transaction. * * @author Robert Varga */ abstract class AbstractProxyTransaction implements Identifiable { - private final DistributedDataStoreClientBehavior client; + /** + * Marker object used instead of read-type of requests, which are satisfied only once. This has a lower footprint + * and allows compressing multiple requests into a single entry. + */ + @NotThreadSafe + private static final class IncrementSequence { + private long delta = 1; - private long sequence; - private boolean sealed; + long getDelta() { + return delta; + } - AbstractProxyTransaction(final DistributedDataStoreClientBehavior client) { - this.client = Preconditions.checkNotNull(client); + void incrementDelta() { + delta++; + } } - /** - * Instantiate a new tracker for a transaction. This method bases its decision on which implementation to use - * based on provided {@link ShardBackendInfo}. If no information is present, it will choose the remote - * implementation, which is fine, as the queueing logic in ClientActorBehavior will hold on to the requests until - * the backend is located. + private enum SealState { + /** + * The user has not sealed the transaction yet. + */ + OPEN, + /** + * The user has sealed the transaction, but has not issued a canCommit(). + */ + SEALED, + /** + * The user has sealed the transaction and has issued a canCommit(). + */ + FLUSHED, + } + + private static final Logger LOG = LoggerFactory.getLogger(AbstractProxyTransaction.class); + + private final Deque successfulRequests = new ArrayDeque<>(); + private final ProxyHistory parent; + + /* + * Atomic state-keeping is required to synchronize the process of propagating completed transaction state towards + * the backend -- which may include a successor. + * + * Successor, unlike {@link AbstractProxyTransaction#seal()} is triggered from the client actor thread, which means + * the successor placement needs to be atomic with regard to the application thread. * - * @param client Client behavior - * @param historyId Local history identifier - * @param transactionId Transaction identifier - * @param backend Optional backend identifier - * @return A new state tracker + * In the common case, the application thread performs performs the seal operations and then "immediately" sends + * the corresponding message. The uncommon case is when the seal and send operations race with a connect completion + * or timeout, when a successor is injected. + * + * This leaves the problem of needing to completely transferring state just after all queued messages are replayed + * after a successor was injected, so that it can be properly sealed if we are racing. */ - static AbstractProxyTransaction create(final DistributedDataStoreClientBehavior client, - final LocalHistoryIdentifier historyId, final long transactionId, - final java.util.Optional backend) { - - final java.util.Optional dataTree = backend.flatMap(t -> t.getDataTree()); - final TransactionIdentifier identifier = new TransactionIdentifier(historyId, transactionId); - if (dataTree.isPresent()) { - return new LocalProxyTransaction(client, identifier, dataTree.get().takeSnapshot()); - } else { - return new RemoteProxyTransaction(client, identifier); - } + private volatile SealState sealed = SealState.OPEN; + @GuardedBy("this") + private AbstractProxyTransaction successor; + @GuardedBy("this") + private CountDownLatch successorLatch; + + // Accessed from user thread only, which may not access this object concurrently + private long sequence; + + + AbstractProxyTransaction(final ProxyHistory parent) { + this.parent = Preconditions.checkNotNull(parent); + } + + final ActorRef localActor() { + return parent.localActor(); } - final DistributedDataStoreClientBehavior client() { - return client; + private void incrementSequence(final long delta) { + sequence += delta; + LOG.debug("Transaction {} incremented sequence to {}", this, sequence); } final long nextSequence() { - return sequence++; + final long ret = sequence++; + LOG.debug("Transaction {} allocated sequence {}", this, ret); + return ret; } final void delete(final YangInstanceIdentifier path) { - checkSealed(); + checkNotSealed(); doDelete(path); } final void merge(final YangInstanceIdentifier path, final NormalizedNode data) { - checkSealed(); + checkNotSealed(); doMerge(path, data); } final void write(final YangInstanceIdentifier path, final NormalizedNode data) { - checkSealed(); + checkNotSealed(); doWrite(path, data); } final CheckedFuture exists(final YangInstanceIdentifier path) { - checkSealed(); + checkNotSealed(); return doExists(path); } final CheckedFuture>, ReadFailedException> read(final YangInstanceIdentifier path) { - checkSealed(); + checkNotSealed(); return doRead(path); } + final void sendRequest(final TransactionRequest request, final Consumer> callback) { + LOG.debug("Transaction proxy {} sending request {} callback {}", this, request, callback); + parent.sendRequest(request, callback); + } + /** - * Seal this transaction before it is either + * Seal this transaction before it is either committed or aborted. */ final void seal() { - checkSealed(); - doSeal(); - sealed = true; + final CountDownLatch localLatch; + + synchronized (this) { + checkNotSealed(); + doSeal(); + + // Fast path: no successor + if (successor == null) { + sealed = SealState.SEALED; + parent.onTransactionSealed(this); + return; + } + + localLatch = successorLatch; + } + + // Slow path: wait for the latch + LOG.debug("{} waiting on successor latch", getIdentifier()); + try { + localLatch.await(); + } catch (InterruptedException e) { + LOG.warn("{} interrupted while waiting for latch", getIdentifier()); + throw Throwables.propagate(e); + } + + synchronized (this) { + LOG.debug("{} reacquired lock", getIdentifier()); + + flushState(successor); + successor.seal(); + + sealed = SealState.FLUSHED; + parent.onTransactionSealed(this); + } + } + + private void checkNotSealed() { + Preconditions.checkState(sealed == SealState.OPEN, "Transaction %s has already been sealed", getIdentifier()); } - private void checkSealed() { - Preconditions.checkState(sealed, "Transaction %s has not been sealed yet", getIdentifier()); + private SealState checkSealed() { + final SealState local = sealed; + Preconditions.checkState(local != SealState.OPEN, "Transaction %s has not been sealed yet", getIdentifier()); + return local; + } + + final void recordSuccessfulRequest(final @Nonnull TransactionRequest req) { + successfulRequests.add(Verify.verifyNotNull(req)); + } + + final void recordFinishedRequest() { + final Object last = successfulRequests.peekLast(); + if (last instanceof IncrementSequence) { + ((IncrementSequence) last).incrementDelta(); + } else { + successfulRequests.addLast(new IncrementSequence()); + } } /** @@ -127,36 +234,15 @@ abstract class AbstractProxyTransaction implements Identifiable directCommit() { - checkSealed(); - - final SettableFuture ret = SettableFuture.create(); - client().sendRequest(nextSequence(), Verify.verifyNotNull(doCommit(false)), t -> { - if (t instanceof TransactionCommitSuccess) { - ret.set(Boolean.TRUE); - } else if (t instanceof RequestFailure) { - ret.setException(((RequestFailure) t).getCause()); - } else { - ret.setException(new IllegalStateException("Unhandled response " + t.getClass())); - } - }); - return ret; - } - - void abort(final VotingFuture ret) { + final void abort(final VotingFuture ret) { checkSealed(); - client.sendRequest(nextSequence(), new TransactionAbortRequest(getIdentifier(), client().self()), t -> { + sendAbort(t -> { if (t instanceof TransactionAbortSuccess) { ret.voteYes(); } else if (t instanceof RequestFailure) { @@ -164,27 +250,132 @@ abstract class AbstractProxyTransaction implements Identifiable ret) { - checkSealed(); + final void sendAbort(final Consumer> callback) { + sendRequest(new TransactionAbortRequest(getIdentifier(), nextSequence(), localActor()), callback); + } - client.sendRequest(nextSequence(), Verify.verifyNotNull(doCommit(true)), t -> { - if (t instanceof TransactionCanCommitSuccess) { - ret.voteYes(); - } else if (t instanceof RequestFailure) { - ret.voteNo(((RequestFailure) t).getCause()); - } else { - ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass())); + /** + * Commit this transaction, possibly in a coordinated fashion. + * + * @param coordinated True if this transaction should be coordinated across multiple participants. + * @return Future completion + */ + final ListenableFuture directCommit() { + final CountDownLatch localLatch; + + synchronized (this) { + final SealState local = checkSealed(); + + // Fast path: no successor asserted + if (successor == null) { + Verify.verify(local == SealState.SEALED); + + final SettableFuture ret = SettableFuture.create(); + sendRequest(Verify.verifyNotNull(commitRequest(false)), t -> { + if (t instanceof TransactionCommitSuccess) { + ret.set(Boolean.TRUE); + } else if (t instanceof RequestFailure) { + ret.setException(((RequestFailure) t).getCause()); + } else { + ret.setException(new IllegalStateException("Unhandled response " + t.getClass())); + } + + // This is a terminal request, hence we do not need to record it + LOG.debug("Transaction {} directCommit completed", this); + parent.completeTransaction(this); + }); + + sealed = SealState.FLUSHED; + return ret; } - }); + + // We have a successor, take its latch + localLatch = successorLatch; + } + + // Slow path: we need to wait for the successor to completely propagate + LOG.debug("{} waiting on successor latch", getIdentifier()); + try { + localLatch.await(); + } catch (InterruptedException e) { + LOG.warn("{} interrupted while waiting for latch", getIdentifier()); + throw Throwables.propagate(e); + } + + synchronized (this) { + LOG.debug("{} reacquired lock", getIdentifier()); + + final SealState local = sealed; + Verify.verify(local == SealState.FLUSHED); + + return successor.directCommit(); + } + } + + final void canCommit(final VotingFuture ret) { + final CountDownLatch localLatch; + + synchronized (this) { + final SealState local = checkSealed(); + + // Fast path: no successor asserted + if (successor == null) { + Verify.verify(local == SealState.SEALED); + + final TransactionRequest req = Verify.verifyNotNull(commitRequest(true)); + sendRequest(req, t -> { + if (t instanceof TransactionCanCommitSuccess) { + ret.voteYes(); + } else if (t instanceof RequestFailure) { + ret.voteNo(((RequestFailure) t).getCause()); + } else { + ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass())); + } + + recordSuccessfulRequest(req); + LOG.debug("Transaction {} canCommit completed", this); + }); + + sealed = SealState.FLUSHED; + return; + } + + // We have a successor, take its latch + localLatch = successorLatch; + } + + // Slow path: we need to wait for the successor to completely propagate + LOG.debug("{} waiting on successor latch", getIdentifier()); + try { + localLatch.await(); + } catch (InterruptedException e) { + LOG.warn("{} interrupted while waiting for latch", getIdentifier()); + throw Throwables.propagate(e); + } + + synchronized (this) { + LOG.debug("{} reacquired lock", getIdentifier()); + + final SealState local = sealed; + Verify.verify(local == SealState.FLUSHED); + + successor.canCommit(ret); + } } - void preCommit(final VotingFuture ret) { + final void preCommit(final VotingFuture ret) { checkSealed(); - client.sendRequest(nextSequence(), new TransactionPreCommitRequest(getIdentifier(), client().self()), t-> { + final TransactionRequest req = new TransactionPreCommitRequest(getIdentifier(), nextSequence(), + localActor()); + sendRequest(req, t -> { if (t instanceof TransactionPreCommitSuccess) { ret.voteYes(); } else if (t instanceof RequestFailure) { @@ -192,13 +383,16 @@ abstract class AbstractProxyTransaction implements Identifiable ret) { checkSealed(); - client.sendRequest(nextSequence(), new TransactionDoCommitRequest(getIdentifier(), client().self()), t-> { + sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), t -> { if (t instanceof TransactionCommitSuccess) { ret.voteYes(); } else if (t instanceof RequestFailure) { @@ -206,9 +400,73 @@ abstract class AbstractProxyTransaction implements Identifiable) obj, null); + } else { + Verify.verify(obj instanceof IncrementSequence); + successor.incrementSequence(((IncrementSequence) obj).getDelta()); + } + } + LOG.debug("{} replayed {} successful requests", getIdentifier(), successfulRequests.size()); + successfulRequests.clear(); + + /* + * Before releasing the lock we need to make sure that a call to seal() blocks until we have completed + * finishConnect(). + */ + successorLatch = new CountDownLatch(1); + } + + final synchronized void finishReconnect() { + Preconditions.checkState(successorLatch != null); + + if (sealed == SealState.SEALED) { + /* + * If this proxy is in the 'sealed, have not sent canCommit' state. If so, we need to forward current + * leftover state to the successor now. + */ + flushState(successor); + successor.seal(); + sealed = SealState.FLUSHED; + } + + // All done, release the latch, unblocking seal() and canCommit() + successorLatch.countDown(); + } + + /** + * Invoked from a retired connection for requests which have been in-flight and need to be re-adjusted + * and forwarded to the successor connection. + * + * @param request Request to be forwarded + * @param callback Original callback + * @throws RequestException when the request is unhandled by the successor + */ + final synchronized void replayRequest(final TransactionRequest request, + final Consumer> callback) { + Preconditions.checkState(successor != null, "%s does not have a successor set", this); + + if (successor instanceof LocalProxyTransaction) { + forwardToLocal((LocalProxyTransaction)successor, request, callback); + } else if (successor instanceof RemoteProxyTransaction) { + forwardToRemote((RemoteProxyTransaction)successor, request, callback); + } else { + throw new IllegalStateException("Unhandled successor " + successor); + } + } + abstract void doDelete(final YangInstanceIdentifier path); abstract void doMerge(final YangInstanceIdentifier path, final NormalizedNode data); @@ -217,12 +475,41 @@ abstract class AbstractProxyTransaction implements Identifiable doExists(final YangInstanceIdentifier path); - abstract CheckedFuture>, ReadFailedException> doRead(final YangInstanceIdentifier path); + abstract CheckedFuture>, ReadFailedException> doRead( + final YangInstanceIdentifier path); abstract void doSeal(); abstract void doAbort(); - abstract TransactionRequest doCommit(boolean coordinated); + @GuardedBy("this") + abstract void flushState(AbstractProxyTransaction successor); + + abstract TransactionRequest commitRequest(boolean coordinated); + /** + * Invoked from {@link RemoteProxyTransaction} when it replays its successful requests to its successor. There is + * no equivalent of this call from {@link LocalProxyTransaction} because it does not send a request until all + * operations are packaged in the message. + * + *

+ * Note: this method is invoked by the predecessor on the successor. + * + * @param request Request which needs to be forwarded + * @param callback Callback to be invoked once the request completes + */ + abstract void handleForwardedRemoteRequest(TransactionRequest request, + @Nullable Consumer> callback); + + /** + * Replay a request originating in this proxy to a successor remote proxy. + */ + abstract void forwardToRemote(RemoteProxyTransaction successor, TransactionRequest request, + Consumer> callback); + + /** + * Replay a request originating in this proxy to a successor local proxy. + */ + abstract void forwardToLocal(LocalProxyTransaction successor, TransactionRequest request, + Consumer> callback); }