import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
static final long INIT_TX_TIMEOUT_SECONDS = 125;
private static final long DEAD_TIMEOUT_SECONDS = TimeUnit.MINUTES.toSeconds(15);
+ private static final AtomicLong COUNTER = new AtomicLong();
/*
* writingExecutor is a single thread executor. Only this thread will write to datastore,
* incurring sleep penalties if backend is not responsive. This thread never changes, but reads State.
* This thread only adds to futures set.
*/
- private final ScheduledExecutorService writingExecutor = FinalizableScheduledExecutorService.newSingleThread();
+ private final ScheduledExecutorService writingExecutor = newExecutorService("writing");
/*
* completingExecutor is a single thread executor. Only this thread writes to State.
* This thread should never incur any sleep penalty, so RPC response should always come on time.
* This thread only removes from futures set.
*/
- private final ScheduledExecutorService completingExecutor = FinalizableScheduledExecutorService.newSingleThread();
+ private final ScheduledExecutorService completingExecutor = newExecutorService("completing");
private final Collection<ListenableFuture<?>> futures = Collections.synchronizedSet(new HashSet<>());
private final Stopwatch stopwatch = Stopwatch.createUnstarted();
private final long runtimeNanos;
state = State.SUCCESSFUL;
completingFuture.cancel(false);
runSuccessful(txCounter.get());
+ shutdownExecutors();
return true;
}
state = State.FAILED;
writingFuture.cancel(false);
runFailed(cause, txId);
+ shutdownExecutors();
break;
default:
throw new IllegalStateException("Unhandled state " + local);
state = State.FAILED;
runTimedOut("Transactions did not finish in " + DEAD_TIMEOUT_SECONDS + " seconds");
+ shutdownExecutors();
+ }
+
+ private void shutdownExecutors() {
+ writingExecutor.shutdown();
+ completingExecutor.shutdown();
}
abstract FluentFuture<? extends CommitInfo> execWrite(long txId);
abstract void runSuccessful(long allTx);
abstract void runTimedOut(String cause);
+
+ private ScheduledExecutorService newExecutorService(final String kind) {
+ final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat(getClass().getSimpleName() + "-" + kind + "-" + COUNTER.getAndIncrement() + "%d")
+ .build());
+ executor.setKeepAliveTime(15, TimeUnit.SECONDS);
+ executor.allowCoreThreadTimeOut(true);
+ return executor;
+ }
}