- final synchronized void startReconnect(final AbstractProxyTransaction successor) {
- Preconditions.checkState(this.successor == null);
- this.successor = Preconditions.checkNotNull(successor);
+ // Called with the connection unlocked
+ final synchronized void startReconnect() {
+ // At this point canCommit/directCommit are blocked, we assert a new successor state, retrieving the previous
+ // state. This method is called with the queue still unlocked.
+ final SuccessorState nextState = new SuccessorState();
+ final State prevState = STATE_UPDATER.getAndSet(this, nextState);
+
+ LOG.debug("Start reconnect of proxy {} previous state {}", this, prevState);
+ Verify.verify(!(prevState instanceof SuccessorState), "Proxy %s duplicate reconnect attempt after %s", this,
+ prevState);
+
+ // We have asserted a slow-path state, seal(), canCommit(), directCommit() are forced to slow paths, which will
+ // wait until we unblock nextState's latch before accessing state. Now we record prevState for later use and we
+ // are done.
+ nextState.setPrevState(prevState);
+ }
+
+ // Called with the connection locked
+ final void replayMessages(final AbstractProxyTransaction successor,
+ final Iterable<ConnectionEntry> enqueuedEntries) {
+ final SuccessorState local = getSuccessorState();
+ local.setSuccessor(successor);