+ private void enqueuePurge() {
+ enqueuePurge(null);
+ }
+
+ final void enqueuePurge(final Consumer<Response<?, ?>> callback) {
+ // Purge request are dispatched internally, hence should not wait
+ enqueuePurge(callback, parent.currentTime());
+ }
+
+ final void enqueuePurge(final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
+ LOG.debug("{}: initiating purge", this);
+
+ final State prev = state;
+ if (prev instanceof SuccessorState) {
+ ((SuccessorState) prev).setDone();
+ } else {
+ final boolean success = STATE_UPDATER.compareAndSet(this, prev, DONE);
+ if (!success) {
+ LOG.warn("{}: moved from state {} while we were purging it", this, prev);
+ }
+ }
+
+ successfulRequests.clear();
+
+ enqueueRequest(new TransactionPurgeRequest(getIdentifier(), nextSequence(), localActor()), resp -> {
+ LOG.debug("{}: purge completed", this);
+ parent.purgeTransaction(this);
+
+ if (callback != null) {
+ callback.accept(resp);
+ }
+ }, enqueuedTicks);
+ }
+
+ // Called with the connection unlocked
+ final synchronized void startReconnect() {
+ // At this point canCommit/directCommit are blocked, we assert a new successor state, retrieving the previous
+ // state. This method is called with the queue still unlocked.
+ final SuccessorState nextState = new SuccessorState();
+ final State prevState = STATE_UPDATER.getAndSet(this, nextState);
+
+ LOG.debug("Start reconnect of proxy {} previous state {}", this, prevState);
+ verify(!(prevState instanceof SuccessorState), "Proxy %s duplicate reconnect attempt after %s", this,
+ prevState);
+
+ // We have asserted a slow-path state, seal(), canCommit(), directCommit() are forced to slow paths, which will
+ // wait until we unblock nextState's latch before accessing state. Now we record prevState for later use and we
+ // are done.
+ nextState.setPrevState(prevState);
+ }
+
+ // Called with the connection locked
+ final void replayMessages(final ProxyHistory successorHistory, final Iterable<ConnectionEntry> enqueuedEntries) {
+ final SuccessorState local = getSuccessorState();
+ final State prevState = local.getPrevState();
+
+ final AbstractProxyTransaction successor = successorHistory.createTransactionProxy(getIdentifier(),
+ isSnapshotOnly(), local.isDone());
+ LOG.debug("{} created successor {}", this, successor);
+ local.setSuccessor(successor);
+
+ // Replay successful requests first
+ if (!successfulRequests.isEmpty()) {
+ // We need to find a good timestamp to use for successful requests, as we do not want to time them out
+ // nor create timing inconsistencies in the queue -- requests are expected to be ordered by their enqueue
+ // time. We will pick the time of the first entry available. If there is none, we will just use current
+ // time, as all other requests will get enqueued afterwards.
+ final ConnectionEntry firstInQueue = Iterables.getFirst(enqueuedEntries, null);
+ final long now = firstInQueue != null ? firstInQueue.getEnqueuedTicks() : parent.currentTime();
+
+ for (Object obj : successfulRequests) {
+ if (obj instanceof TransactionRequest) {
+ LOG.debug("Forwarding successful request {} to successor {}", obj, successor);
+ successor.doReplayRequest((TransactionRequest<?>) obj, resp -> { /*NOOP*/ }, now);
+ } else {
+ verify(obj instanceof IncrementSequence);
+ final IncrementSequence increment = (IncrementSequence) obj;
+ successor.doReplayRequest(new IncrementTransactionSequenceRequest(getIdentifier(),
+ increment.getSequence(), localActor(), isSnapshotOnly(),
+ increment.getDelta()), resp -> { /*NOOP*/ }, now);
+ LOG.debug("Incrementing sequence {} to successor {}", obj, successor);
+ }
+ }
+ LOG.debug("{} replayed {} successful requests", getIdentifier(), successfulRequests.size());
+ successfulRequests.clear();
+ }
+
+ // Now replay whatever is in the connection
+ final Iterator<ConnectionEntry> it = enqueuedEntries.iterator();
+ while (it.hasNext()) {
+ final ConnectionEntry e = it.next();
+ final Request<?, ?> req = e.getRequest();
+
+ if (getIdentifier().equals(req.getTarget())) {
+ verify(req instanceof TransactionRequest, "Unhandled request %s", req);
+ LOG.debug("Replaying queued request {} to successor {}", req, successor);
+ successor.doReplayRequest((TransactionRequest<?>) req, e.getCallback(), e.getEnqueuedTicks());
+ it.remove();
+ }
+ }
+
+ /*
+ * Check the state at which we have started the reconnect attempt. State transitions triggered while we were
+ * reconnecting have been forced to slow paths, which will be unlocked once we unblock the state latch
+ * at the end of this method.
+ */
+ if (SEALED.equals(prevState)) {
+ LOG.debug("Proxy {} reconnected while being sealed, propagating state to successor {}", this, successor);
+ final long enqueuedTicks = parent.currentTime();
+ final Optional<ModifyTransactionRequest> optState = flushState();
+ if (optState.isPresent()) {
+ successor.handleReplayedRemoteRequest(optState.get(), null, enqueuedTicks);
+ }
+ if (successor.markSealed()) {
+ successor.sealAndSend(OptionalLong.of(enqueuedTicks));
+ }
+ }
+ }
+
+ /**
+ * Invoked from {@link #replayMessages(AbstractProxyTransaction, Iterable)} to have successor adopt an in-flight
+ * request.
+ *
+ * <p>
+ * Note: this method is invoked by the predecessor on the successor.
+ *
+ * @param request Request which needs to be forwarded
+ * @param callback Callback to be invoked once the request completes
+ * @param enqueuedTicks ticker-based time stamp when the request was enqueued
+ */
+ private void doReplayRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
+ final long enqueuedTicks) {
+ if (request instanceof AbstractLocalTransactionRequest) {
+ handleReplayedLocalRequest((AbstractLocalTransactionRequest<?>) request, callback, enqueuedTicks);
+ } else {
+ handleReplayedRemoteRequest(request, callback, enqueuedTicks);
+ }
+ }
+
+ // Called with the connection locked
+ final void finishReconnect() {
+ final SuccessorState local = getSuccessorState();
+ LOG.debug("Finishing reconnect of proxy {}", this);
+
+ // All done, release the latch, unblocking seal() and canCommit() slow paths
+ local.finish();