import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
static final long INIT_TX_TIMEOUT_SECONDS = 125;
private static final long DEAD_TIMEOUT_SECONDS = TimeUnit.MINUTES.toSeconds(15);
+ private static final AtomicLong COUNTER = new AtomicLong();
/*
* writingExecutor is a single thread executor. Only this thread will write to datastore,
* incurring sleep penalties if backend is not responsive. This thread never changes, but reads State.
* This thread only adds to futures set.
*/
- private final ScheduledExecutorService writingExecutor = FinalizableScheduledExecutorService.newSingleThread();
+ private final ScheduledExecutorService writingExecutor = newExecutorService("writing");
/*
* completingExecutor is a single thread executor. Only this thread writes to State.
* This thread should never incur any sleep penalty, so RPC response should always come on time.
* This thread only removes from futures set.
*/
- private final ScheduledExecutorService completingExecutor = FinalizableScheduledExecutorService.newSingleThread();
+ private final ScheduledExecutorService completingExecutor = newExecutorService("completing");
private final Collection<ListenableFuture<?>> futures = Collections.synchronizedSet(new HashSet<>());
private final Stopwatch stopwatch = Stopwatch.createUnstarted();
private final long runtimeNanos;
state = State.SUCCESSFUL;
completingFuture.cancel(false);
runSuccessful(txCounter.get());
+ shutdownExecutors();
return true;
}
state = State.FAILED;
writingFuture.cancel(false);
runFailed(cause, txId);
+ shutdownExecutors();
break;
default:
throw new IllegalStateException("Unhandled state " + local);
state = State.FAILED;
runTimedOut("Transactions did not finish in " + DEAD_TIMEOUT_SECONDS + " seconds");
+ shutdownExecutors();
+ }
+
+ private void shutdownExecutors() {
+ writingExecutor.shutdown();
+ completingExecutor.shutdown();
}
abstract FluentFuture<? extends CommitInfo> execWrite(long txId);
abstract void runSuccessful(long allTx);
abstract void runTimedOut(String cause);
+
+ private ScheduledExecutorService newExecutorService(final String kind) {
+ final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat(getClass().getSimpleName() + "-" + kind + "-" + COUNTER.getAndIncrement() + "%d")
+ .build());
+ executor.setKeepAliveTime(15, TimeUnit.SECONDS);
+ executor.allowCoreThreadTimeOut(true);
+ return executor;
+ }
}
+++ /dev/null
-/*
- * Copyright (c) 2017 Pantheon Technologies, s.r.o. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.clustering.it.provider.impl;
-
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-/**
- * A simple ScheduledExecutorService, which shuts down its threads after a period of inactivity. It is safe to not
- * shutdown this
- *
- * @author Robert Varga
- */
-final class FinalizableScheduledExecutorService extends ScheduledThreadPoolExecutor {
-
- private FinalizableScheduledExecutorService(final int maxThreads, final long time, final TimeUnit unit) {
- super(maxThreads);
- setKeepAliveTime(time, unit);
- allowCoreThreadTimeOut(true);
- }
-
- static ScheduledThreadPoolExecutor newSingleThread() {
- return new FinalizableScheduledExecutorService(1, 15, TimeUnit.SECONDS);
- }
-
- // This is a bit ugly, but allows
- @Override
- @SuppressWarnings("checkstyle:NoFinalizer")
- protected void finalize() {
- super.finalize();
- super.shutdownNow();
- }
-}