+ private final class ResponseWaitingScheduler {
+
+ private ScheduledFuture<?> schedule;
+
+ public void initScheduler(final Runnable runnable) {
+ if (currentKeepalive != null) {
+ currentKeepalive.cancel(true);
+ } else {
+ LOG.trace("Keepalive does not exist.");
+ }
+ scheduleKeepalives();
+ //Listening on the result should be done before the keepalive rpc will be send
+ final long delay = (keepaliveDelaySeconds * 1000) - 500;
+ schedule = executor.schedule(runnable, delay, TimeUnit.MILLISECONDS);
+ }
+
+ public void stopScheduler() {
+ if (schedule != null) {
+ schedule.cancel(true);
+ } else {
+ LOG.trace("Scheduler does not exist.");
+ }
+ }
+ }
+
+ private static final class ResponseWaiting implements Runnable {
+
+ private final FluentFuture<DOMRpcResult> rpcResultFuture;
+ private final ResponseWaitingScheduler responseWaitingScheduler;
+
+ ResponseWaiting(final ResponseWaitingScheduler responseWaitingScheduler,
+ final FluentFuture<DOMRpcResult> rpcResultFuture) {
+ this.responseWaitingScheduler = responseWaitingScheduler;
+ this.rpcResultFuture = rpcResultFuture;
+ }
+
+ public void start() {
+ LOG.trace("Start to waiting for result.");
+ responseWaitingScheduler.initScheduler(this);
+ }
+
+ public void stop() {
+ LOG.info("Stop to waiting for result.");
+ responseWaitingScheduler.stopScheduler();
+ }
+
+ @Override
+ public void run() {
+ if (!rpcResultFuture.isCancelled() && !rpcResultFuture.isDone()) {
+ LOG.trace("Waiting for result");
+ responseWaitingScheduler.initScheduler(this);
+ } else {
+ LOG.trace("Result has been cancelled or done.");
+ }
+ }
+ }
+