+ /**
+ * Invoked from {@link #replayMessages(AbstractProxyTransaction, Iterable)} to have successor adopt an in-flight
+ * request.
+ *
+ * <p>
+ * Note: this method is invoked by the predecessor on the successor.
+ *
+ * @param request Request which needs to be forwarded
+ * @param callback Callback to be invoked once the request completes
+ * @param enqueuedTicks ticker-based time stamp when the request was enqueued
+ */
+ private void doReplayRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
+ final long enqueuedTicks) {
+ if (request instanceof AbstractLocalTransactionRequest) {
+ handleReplayedLocalRequest((AbstractLocalTransactionRequest<?>) request, callback, enqueuedTicks);
+ } else {
+ handleReplayedRemoteRequest(request, callback, enqueuedTicks);
+ }
+ }
+
+ // Called with the connection locked
+ final void finishReconnect() {
+ final SuccessorState local = getSuccessorState();
+ LOG.debug("Finishing reconnect of proxy {}", this);
+
+ // All done, release the latch, unblocking seal() and canCommit() slow paths
+ local.finish();
+ }
+
+ /**
+ * Invoked from a retired connection for requests which have been in-flight and need to be re-adjusted
+ * and forwarded to the successor connection.
+ *
+ * @param request Request to be forwarded
+ * @param callback Original callback
+ */
+ final void forwardRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
+ forwardToSuccessor(getSuccessorState().getSuccessor(), request, callback);
+ }
+
+ final void forwardToSuccessor(final AbstractProxyTransaction successor, final TransactionRequest<?> request,
+ final Consumer<Response<?, ?>> callback) {
+ if (successor instanceof LocalProxyTransaction) {
+ forwardToLocal((LocalProxyTransaction)successor, request, callback);
+ } else if (successor instanceof RemoteProxyTransaction) {
+ forwardToRemote((RemoteProxyTransaction)successor, request, callback);
+ } else {
+ throw new IllegalStateException("Unhandled successor " + successor);
+ }
+ }
+
+ final void replayRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
+ final long enqueuedTicks) {
+ getSuccessorState().getSuccessor().doReplayRequest(request, callback, enqueuedTicks);
+ }
+
+ abstract boolean isSnapshotOnly();
+
+ abstract void doDelete(YangInstanceIdentifier path);
+
+ abstract void doMerge(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
+
+ abstract void doWrite(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
+
+ abstract FluentFuture<Boolean> doExists(YangInstanceIdentifier path);
+
+ abstract FluentFuture<Optional<NormalizedNode<?, ?>>> doRead(YangInstanceIdentifier path);
+
+ @GuardedBy("this")
+ abstract java.util.Optional<ModifyTransactionRequest> flushState();
+
+ abstract TransactionRequest<?> abortRequest();
+
+ abstract TransactionRequest<?> commitRequest(boolean coordinated);
+
+ /**
+ * Replay a request originating in this proxy to a successor remote proxy.
+ */
+ abstract void forwardToRemote(RemoteProxyTransaction successor, TransactionRequest<?> request,
+ Consumer<Response<?, ?>> callback);
+
+ /**
+ * Replay a request originating in this proxy to a successor local proxy.
+ */
+ abstract void forwardToLocal(LocalProxyTransaction successor, TransactionRequest<?> request,
+ Consumer<Response<?, ?>> callback);
+
+ /**
+ * Invoked from {@link LocalProxyTransaction} when it replays its successful requests to its successor.
+ *
+ * <p>
+ * Note: this method is invoked by the predecessor on the successor.
+ *
+ * @param request Request which needs to be forwarded
+ * @param callback Callback to be invoked once the request completes
+ * @param enqueuedTicks Time stamp to use for enqueue time
+ */
+ abstract void handleReplayedLocalRequest(AbstractLocalTransactionRequest<?> request,
+ @Nullable Consumer<Response<?, ?>> callback, long enqueuedTicks);
+
+ /**
+ * Invoked from {@link RemoteProxyTransaction} when it replays its successful requests to its successor.
+ *
+ * <p>
+ * Note: this method is invoked by the predecessor on the successor.
+ *
+ * @param request Request which needs to be forwarded
+ * @param callback Callback to be invoked once the request completes
+ * @param enqueuedTicks Time stamp to use for enqueue time
+ */
+ abstract void handleReplayedRemoteRequest(TransactionRequest<?> request,
+ @Nullable Consumer<Response<?, ?>> callback, long enqueuedTicks);
+
+ private static IllegalStateException unhandledResponseException(Response<?, ?> resp) {
+ return new IllegalStateException("Unhandled response " + resp.getClass());
+ }
+
+ @Override
+ public final String toString() {
+ return MoreObjects.toStringHelper(this).add("identifier", getIdentifier()).add("state", state).toString();
+ }