+ // Generic state base class. Direct instances are used for fast paths, sub-class is used for successor transitions
+ private static class State {
+ private final String string;
+
+ State(final String string) {
+ this.string = Preconditions.checkNotNull(string);
+ }
+
+ @Override
+ public final String toString() {
+ return string;
+ }
+ }
+
+ // State class used when a successor has interfered. Contains coordinator latch, the successor and previous state
+ private static final class SuccessorState extends State {
+ private final CountDownLatch latch = new CountDownLatch(1);
+ private AbstractProxyTransaction successor;
+ private State prevState;
+
+ SuccessorState() {
+ super("successor");
+ }
+
+ // Synchronize with succession process and return the successor
+ AbstractProxyTransaction await() {
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted while waiting for latch of {}", successor);
+ throw Throwables.propagate(e);
+ }
+ return successor;
+ }
+
+ void finish() {
+ latch.countDown();
+ }
+
+ State getPrevState() {
+ return prevState;
+ }
+
+ void setPrevState(final State prevState) {
+ Verify.verify(this.prevState == null);
+ this.prevState = Preconditions.checkNotNull(prevState);
+ }
+
+ // To be called from safe contexts, where successor is known to be completed
+ AbstractProxyTransaction getSuccessor() {
+ return Verify.verifyNotNull(successor);
+ }
+
+ void setSuccessor(final AbstractProxyTransaction successor) {
+ Verify.verify(this.successor == null);
+ this.successor = Preconditions.checkNotNull(successor);
+ }
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractProxyTransaction.class);
+ private static final AtomicIntegerFieldUpdater<AbstractProxyTransaction> SEALED_UPDATER =
+ AtomicIntegerFieldUpdater.newUpdater(AbstractProxyTransaction.class, "sealed");
+ private static final AtomicReferenceFieldUpdater<AbstractProxyTransaction, State> STATE_UPDATER =
+ AtomicReferenceFieldUpdater.newUpdater(AbstractProxyTransaction.class, State.class, "state");
+ private static final State OPEN = new State("open");
+ private static final State SEALED = new State("sealed");
+ private static final State FLUSHED = new State("flushed");
+
+ // Touched from client actor thread only
+ private final Deque<Object> successfulRequests = new ArrayDeque<>();
+ private final ProxyHistory parent;
+
+ // Accessed from user thread only, which may not access this object concurrently
+ private long sequence;
+
+ /*
+ * Atomic state-keeping is required to synchronize the process of propagating completed transaction state towards
+ * the backend -- which may include a successor.
+ *
+ * Successor, unlike {@link AbstractProxyTransaction#seal()} is triggered from the client actor thread, which means
+ * the successor placement needs to be atomic with regard to the application thread.
+ *
+ * In the common case, the application thread performs performs the seal operations and then "immediately" sends
+ * the corresponding message. The uncommon case is when the seal and send operations race with a connect completion
+ * or timeout, when a successor is injected.
+ *
+ * This leaves the problem of needing to completely transferring state just after all queued messages are replayed
+ * after a successor was injected, so that it can be properly sealed if we are racing. Further complication comes
+ * from lock ordering, where the successor injection works with a locked queue and locks proxy objects -- leading
+ * to a potential AB-BA deadlock in case of a naive implementation.