From ff0c6d28a939ed42161efbc71b8394756a1fc383 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Thu, 28 May 2020 07:16:01 +0200 Subject: [PATCH] Remove FinalizableScheduledExecutorService Use proper executor service shutdown after the handler has run its course instead of relying on garbage collection. Change-Id: I896ae5d28836f9dea48448c7c54a044279047a5c Signed-off-by: Robert Varga --- .../impl/AbstractTransactionHandler.java | 25 +++++++++++- .../FinalizableScheduledExecutorService.java | 38 ------------------- 2 files changed, 23 insertions(+), 40 deletions(-) delete mode 100644 opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/FinalizableScheduledExecutorService.java diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/AbstractTransactionHandler.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/AbstractTransactionHandler.java index 6b0657ee6f..e821a2b576 100644 --- a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/AbstractTransactionHandler.java +++ b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/AbstractTransactionHandler.java @@ -11,12 +11,14 @@ import com.google.common.base.Stopwatch; import com.google.common.util.concurrent.FluentFuture; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; @@ -58,19 +60,20 @@ abstract class AbstractTransactionHandler { static final long INIT_TX_TIMEOUT_SECONDS = 125; private static final long DEAD_TIMEOUT_SECONDS = TimeUnit.MINUTES.toSeconds(15); + private static final AtomicLong COUNTER = new AtomicLong(); /* * writingExecutor is a single thread executor. Only this thread will write to datastore, * incurring sleep penalties if backend is not responsive. This thread never changes, but reads State. * This thread only adds to futures set. */ - private final ScheduledExecutorService writingExecutor = FinalizableScheduledExecutorService.newSingleThread(); + private final ScheduledExecutorService writingExecutor = newExecutorService("writing"); /* * completingExecutor is a single thread executor. Only this thread writes to State. * This thread should never incur any sleep penalty, so RPC response should always come on time. * This thread only removes from futures set. */ - private final ScheduledExecutorService completingExecutor = FinalizableScheduledExecutorService.newSingleThread(); + private final ScheduledExecutorService completingExecutor = newExecutorService("completing"); private final Collection> futures = Collections.synchronizedSet(new HashSet<>()); private final Stopwatch stopwatch = Stopwatch.createUnstarted(); private final long runtimeNanos; @@ -153,6 +156,7 @@ abstract class AbstractTransactionHandler { state = State.SUCCESSFUL; completingFuture.cancel(false); runSuccessful(txCounter.get()); + shutdownExecutors(); return true; } @@ -191,6 +195,7 @@ abstract class AbstractTransactionHandler { state = State.FAILED; writingFuture.cancel(false); runFailed(cause, txId); + shutdownExecutors(); break; default: throw new IllegalStateException("Unhandled state " + local); @@ -224,6 +229,12 @@ abstract class AbstractTransactionHandler { state = State.FAILED; runTimedOut("Transactions did not finish in " + DEAD_TIMEOUT_SECONDS + " seconds"); + shutdownExecutors(); + } + + private void shutdownExecutors() { + writingExecutor.shutdown(); + completingExecutor.shutdown(); } abstract FluentFuture execWrite(long txId); @@ -233,4 +244,14 @@ abstract class AbstractTransactionHandler { abstract void runSuccessful(long allTx); abstract void runTimedOut(String cause); + + private ScheduledExecutorService newExecutorService(final String kind) { + final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat(getClass().getSimpleName() + "-" + kind + "-" + COUNTER.getAndIncrement() + "%d") + .build()); + executor.setKeepAliveTime(15, TimeUnit.SECONDS); + executor.allowCoreThreadTimeOut(true); + return executor; + } } diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/FinalizableScheduledExecutorService.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/FinalizableScheduledExecutorService.java deleted file mode 100644 index f0873034b4..0000000000 --- a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/FinalizableScheduledExecutorService.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright (c) 2017 Pantheon Technologies, s.r.o. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.clustering.it.provider.impl; - -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -/** - * A simple ScheduledExecutorService, which shuts down its threads after a period of inactivity. It is safe to not - * shutdown this - * - * @author Robert Varga - */ -final class FinalizableScheduledExecutorService extends ScheduledThreadPoolExecutor { - - private FinalizableScheduledExecutorService(final int maxThreads, final long time, final TimeUnit unit) { - super(maxThreads); - setKeepAliveTime(time, unit); - allowCoreThreadTimeOut(true); - } - - static ScheduledThreadPoolExecutor newSingleThread() { - return new FinalizableScheduledExecutorService(1, 15, TimeUnit.SECONDS); - } - - // This is a bit ugly, but allows - @Override - @SuppressWarnings("checkstyle:NoFinalizer") - protected void finalize() { - super.finalize(); - super.shutdownNow(); - } -} -- 2.36.6