import akka.actor.ActorRef;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
import com.google.common.base.Verify;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.NotThreadSafe;
import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess;
* @author Robert Varga
*/
abstract class AbstractProxyTransaction implements Identifiable<TransactionIdentifier> {
+ /**
+ * Marker object used instead of read-type of requests, which are satisfied only once. This has a lower footprint
+ * and allows compressing multiple requests into a single entry.
+ */
+ @NotThreadSafe
+ private static final class IncrementSequence {
+ private long delta = 1;
+
+ long getDelta() {
+ return delta;
+ }
+
+ void incrementDelta() {
+ delta++;
+ }
+ }
+
+ private enum SealState {
+ /**
+ * The user has not sealed the transaction yet.
+ */
+ OPEN,
+ /**
+ * The user has sealed the transaction, but has not issued a canCommit().
+ */
+ SEALED,
+ /**
+ * The user has sealed the transaction and has issued a canCommit().
+ */
+ FLUSHED,
+ }
+
private static final Logger LOG = LoggerFactory.getLogger(AbstractProxyTransaction.class);
+ private final Deque<Object> successfulRequests = new ArrayDeque<>();
private final ProxyHistory parent;
+ /*
+ * Atomic state-keeping is required to synchronize the process of propagating completed transaction state towards
+ * the backend -- which may include a successor.
+ *
+ * Successor, unlike {@link AbstractProxyTransaction#seal()} is triggered from the client actor thread, which means
+ * the successor placement needs to be atomic with regard to the application thread.
+ *
+ * In the common case, the application thread performs performs the seal operations and then "immediately" sends
+ * the corresponding message. The uncommon case is when the seal and send operations race with a connect completion
+ * or timeout, when a successor is injected.
+ *
+ * This leaves the problem of needing to completely transferring state just after all queued messages are replayed
+ * after a successor was injected, so that it can be properly sealed if we are racing.
+ */
+ private volatile SealState sealed = SealState.OPEN;
+ @GuardedBy("this")
private AbstractProxyTransaction successor;
+ @GuardedBy("this")
+ private CountDownLatch successorLatch;
+
+ // Accessed from user thread only, which may not access this object concurrently
private long sequence;
- private boolean sealed;
+
AbstractProxyTransaction(final ProxyHistory parent) {
this.parent = Preconditions.checkNotNull(parent);
return parent.localActor();
}
+ private void incrementSequence(final long delta) {
+ sequence += delta;
+ LOG.debug("Transaction {} incremented sequence to {}", this, sequence);
+ }
+
final long nextSequence() {
- return sequence++;
+ final long ret = sequence++;
+ LOG.debug("Transaction {} allocated sequence {}", this, ret);
+ return ret;
}
final void delete(final YangInstanceIdentifier path) {
* Seal this transaction before it is either committed or aborted.
*/
final void seal() {
- checkNotSealed();
- doSeal();
- sealed = true;
- parent.onTransactionSealed(this);
+ final CountDownLatch localLatch;
+
+ synchronized (this) {
+ checkNotSealed();
+ doSeal();
+
+ // Fast path: no successor
+ if (successor == null) {
+ sealed = SealState.SEALED;
+ parent.onTransactionSealed(this);
+ return;
+ }
+
+ localLatch = successorLatch;
+ }
+
+ // Slow path: wait for the latch
+ LOG.debug("{} waiting on successor latch", getIdentifier());
+ try {
+ localLatch.await();
+ } catch (InterruptedException e) {
+ LOG.warn("{} interrupted while waiting for latch", getIdentifier());
+ throw Throwables.propagate(e);
+ }
+
+ synchronized (this) {
+ LOG.debug("{} reacquired lock", getIdentifier());
+
+ flushState(successor);
+ successor.seal();
+
+ sealed = SealState.FLUSHED;
+ parent.onTransactionSealed(this);
+ }
}
private void checkNotSealed() {
- Preconditions.checkState(!sealed, "Transaction %s has already been sealed", getIdentifier());
+ Preconditions.checkState(sealed == SealState.OPEN, "Transaction %s has already been sealed", getIdentifier());
+ }
+
+ private SealState checkSealed() {
+ final SealState local = sealed;
+ Preconditions.checkState(local != SealState.OPEN, "Transaction %s has not been sealed yet", getIdentifier());
+ return local;
+ }
+
+ final void recordSuccessfulRequest(final @Nonnull TransactionRequest<?> req) {
+ successfulRequests.add(Verify.verifyNotNull(req));
}
- private void checkSealed() {
- Preconditions.checkState(sealed, "Transaction %s has not been sealed yet", getIdentifier());
+ final void recordFinishedRequest() {
+ final Object last = successfulRequests.peekLast();
+ if (last instanceof IncrementSequence) {
+ ((IncrementSequence) last).incrementDelta();
+ } else {
+ successfulRequests.addLast(new IncrementSequence());
+ }
}
/**
ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
}
+ // This is a terminal request, hence we do not need to record it
+ LOG.debug("Transaction {} abort completed", this);
parent.completeTransaction(this);
});
}
* @return Future completion
*/
final ListenableFuture<Boolean> directCommit() {
- checkSealed();
-
- final SettableFuture<Boolean> ret = SettableFuture.create();
- sendRequest(Verify.verifyNotNull(commitRequest(false)), t -> {
- if (t instanceof TransactionCommitSuccess) {
- ret.set(Boolean.TRUE);
- } else if (t instanceof RequestFailure) {
- ret.setException(((RequestFailure<?, ?>) t).getCause());
- } else {
- ret.setException(new IllegalStateException("Unhandled response " + t.getClass()));
+ final CountDownLatch localLatch;
+
+ synchronized (this) {
+ final SealState local = checkSealed();
+
+ // Fast path: no successor asserted
+ if (successor == null) {
+ Verify.verify(local == SealState.SEALED);
+
+ final SettableFuture<Boolean> ret = SettableFuture.create();
+ sendRequest(Verify.verifyNotNull(commitRequest(false)), t -> {
+ if (t instanceof TransactionCommitSuccess) {
+ ret.set(Boolean.TRUE);
+ } else if (t instanceof RequestFailure) {
+ ret.setException(((RequestFailure<?, ?>) t).getCause());
+ } else {
+ ret.setException(new IllegalStateException("Unhandled response " + t.getClass()));
+ }
+
+ // This is a terminal request, hence we do not need to record it
+ LOG.debug("Transaction {} directCommit completed", this);
+ parent.completeTransaction(this);
+ });
+
+ sealed = SealState.FLUSHED;
+ return ret;
}
- parent.completeTransaction(this);
- });
- return ret;
+ // We have a successor, take its latch
+ localLatch = successorLatch;
+ }
+
+ // Slow path: we need to wait for the successor to completely propagate
+ LOG.debug("{} waiting on successor latch", getIdentifier());
+ try {
+ localLatch.await();
+ } catch (InterruptedException e) {
+ LOG.warn("{} interrupted while waiting for latch", getIdentifier());
+ throw Throwables.propagate(e);
+ }
+
+ synchronized (this) {
+ LOG.debug("{} reacquired lock", getIdentifier());
+
+ final SealState local = sealed;
+ Verify.verify(local == SealState.FLUSHED);
+
+ return successor.directCommit();
+ }
}
+ final void canCommit(final VotingFuture<?> ret) {
+ final CountDownLatch localLatch;
- void canCommit(final VotingFuture<?> ret) {
- checkSealed();
+ synchronized (this) {
+ final SealState local = checkSealed();
- sendRequest(Verify.verifyNotNull(commitRequest(true)), t -> {
- if (t instanceof TransactionCanCommitSuccess) {
- ret.voteYes();
- } else if (t instanceof RequestFailure) {
- ret.voteNo(((RequestFailure<?, ?>) t).getCause());
- } else {
- ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
+ // Fast path: no successor asserted
+ if (successor == null) {
+ Verify.verify(local == SealState.SEALED);
+
+ final TransactionRequest<?> req = Verify.verifyNotNull(commitRequest(true));
+ sendRequest(req, t -> {
+ if (t instanceof TransactionCanCommitSuccess) {
+ ret.voteYes();
+ } else if (t instanceof RequestFailure) {
+ ret.voteNo(((RequestFailure<?, ?>) t).getCause());
+ } else {
+ ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
+ }
+
+ recordSuccessfulRequest(req);
+ LOG.debug("Transaction {} canCommit completed", this);
+ });
+
+ sealed = SealState.FLUSHED;
+ return;
}
- });
+
+ // We have a successor, take its latch
+ localLatch = successorLatch;
+ }
+
+ // Slow path: we need to wait for the successor to completely propagate
+ LOG.debug("{} waiting on successor latch", getIdentifier());
+ try {
+ localLatch.await();
+ } catch (InterruptedException e) {
+ LOG.warn("{} interrupted while waiting for latch", getIdentifier());
+ throw Throwables.propagate(e);
+ }
+
+ synchronized (this) {
+ LOG.debug("{} reacquired lock", getIdentifier());
+
+ final SealState local = sealed;
+ Verify.verify(local == SealState.FLUSHED);
+
+ successor.canCommit(ret);
+ }
}
- void preCommit(final VotingFuture<?> ret) {
+ final void preCommit(final VotingFuture<?> ret) {
checkSealed();
- sendRequest(new TransactionPreCommitRequest(getIdentifier(), nextSequence(), localActor()), t -> {
+ final TransactionRequest<?> req = new TransactionPreCommitRequest(getIdentifier(), nextSequence(),
+ localActor());
+ sendRequest(req, t -> {
if (t instanceof TransactionPreCommitSuccess) {
ret.voteYes();
} else if (t instanceof RequestFailure) {
} else {
ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
}
+
+ recordSuccessfulRequest(req);
+ LOG.debug("Transaction {} preCommit completed", this);
});
}
ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
}
+ LOG.debug("Transaction {} doCommit completed", this);
parent.completeTransaction(this);
});
}
- void replaySuccessfulRequests(final AbstractProxyTransaction successor) {
+ final synchronized void startReconnect(final AbstractProxyTransaction successor) {
+ Preconditions.checkState(this.successor == null);
this.successor = Preconditions.checkNotNull(successor);
+
+ for (Object obj : successfulRequests) {
+ if (obj instanceof TransactionRequest) {
+ LOG.debug("Forwarding request {} to successor {}", obj, successor);
+ successor.handleForwardedRemoteRequest((TransactionRequest<?>) obj, null);
+ } else {
+ Verify.verify(obj instanceof IncrementSequence);
+ successor.incrementSequence(((IncrementSequence) obj).getDelta());
+ }
+ }
+ LOG.debug("{} replayed {} successful requests", getIdentifier(), successfulRequests.size());
+ successfulRequests.clear();
+
+ /*
+ * Before releasing the lock we need to make sure that a call to seal() blocks until we have completed
+ * finishConnect().
+ */
+ successorLatch = new CountDownLatch(1);
+ }
+
+ final synchronized void finishReconnect() {
+ Preconditions.checkState(successorLatch != null);
+
+ if (sealed == SealState.SEALED) {
+ /*
+ * If this proxy is in the 'sealed, have not sent canCommit' state. If so, we need to forward current
+ * leftover state to the successor now.
+ */
+ flushState(successor);
+ successor.seal();
+ sealed = SealState.FLUSHED;
+ }
+
+ // All done, release the latch, unblocking seal() and canCommit()
+ successorLatch.countDown();
}
/**
* @param callback Original callback
* @throws RequestException when the request is unhandled by the successor
*/
- final void replayRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback)
- throws RequestException {
+ final synchronized void replayRequest(final TransactionRequest<?> request,
+ final Consumer<Response<?, ?>> callback) {
Preconditions.checkState(successor != null, "%s does not have a successor set", this);
if (successor instanceof LocalProxyTransaction) {
abstract void doAbort();
+ @GuardedBy("this")
+ abstract void flushState(AbstractProxyTransaction successor);
+
abstract TransactionRequest<?> commitRequest(boolean coordinated);
/**
* Replay a request originating in this proxy to a successor remote proxy.
*/
abstract void forwardToRemote(RemoteProxyTransaction successor, TransactionRequest<?> request,
- Consumer<Response<?, ?>> callback) throws RequestException;
+ Consumer<Response<?, ?>> callback);
/**
* Replay a request originating in this proxy to a successor local proxy.
*/
abstract void forwardToLocal(LocalProxyTransaction successor, TransactionRequest<?> request,
- Consumer<Response<?, ?>> callback) throws RequestException;
+ Consumer<Response<?, ?>> callback);
}