X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fconfig%2Fthreadpool-config-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fconfig%2Fthreadpool%2Futil%2FFlexibleThreadPoolWrapper.java;fp=opendaylight%2Fconfig%2Fthreadpool-config-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fconfig%2Fthreadpool%2Futil%2FFlexibleThreadPoolWrapper.java;h=0000000000000000000000000000000000000000;hp=6fb1adcb699c7056776961b27b67ab5346b54ec7;hb=9917911b1a492b5f9fbeef1591569f7fc4a80f68;hpb=77387f3590ca4c42c3ddce52131d5c00d2a3992d diff --git a/opendaylight/config/threadpool-config-impl/src/main/java/org/opendaylight/controller/config/threadpool/util/FlexibleThreadPoolWrapper.java b/opendaylight/config/threadpool-config-impl/src/main/java/org/opendaylight/controller/config/threadpool/util/FlexibleThreadPoolWrapper.java deleted file mode 100644 index 6fb1adcb69..0000000000 --- a/opendaylight/config/threadpool-config-impl/src/main/java/org/opendaylight/controller/config/threadpool/util/FlexibleThreadPoolWrapper.java +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.config.threadpool.util; - -import java.io.Closeable; -import java.util.OptionalInt; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.RejectedExecutionHandler; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import org.opendaylight.controller.config.threadpool.ThreadPool; - -/** - * Implementation of {@link ThreadPool} using flexible number of threads wraps - * {@link ExecutorService}. - */ -public class FlexibleThreadPoolWrapper implements ThreadPool, Closeable { - private final ThreadPoolExecutor executor; - - public FlexibleThreadPoolWrapper(final int minThreadCount, final int maxThreadCount, final long keepAlive, - final TimeUnit timeUnit, final ThreadFactory threadFactory) { - this(minThreadCount, maxThreadCount, keepAlive, timeUnit, threadFactory, getQueue(OptionalInt.empty())); - } - - public FlexibleThreadPoolWrapper(final int minThreadCount, final int maxThreadCount, final long keepAlive, - final TimeUnit timeUnit, final ThreadFactory threadFactory, final OptionalInt queueCapacity) { - this(minThreadCount, maxThreadCount, keepAlive, timeUnit, threadFactory, getQueue(queueCapacity)); - } - - private FlexibleThreadPoolWrapper(final int minThreadCount, final int maxThreadCount, final long keepAlive, - final TimeUnit timeUnit, final ThreadFactory threadFactory, final BlockingQueue queue) { - - executor = new ThreadPoolExecutor(minThreadCount, maxThreadCount, keepAlive, timeUnit, - queue, threadFactory, new FlexibleRejectionHandler()); - executor.prestartAllCoreThreads(); - } - - /** - * Overriding the queue: - * ThreadPoolExecutor would not create new threads if the queue is not full, thus adding - * occurs in RejectedExecutionHandler. - * This impl saturates threadpool first, then queue. When both are full caller will get blocked. - */ - private static ForwardingBlockingQueue getQueue(final OptionalInt capacity) { - return new ForwardingBlockingQueue( - capacity.isPresent() ? new LinkedBlockingQueue<>(capacity.orElseThrow()) : new LinkedBlockingQueue<>()); - } - - @Override - public ExecutorService getExecutor() { - return Executors.unconfigurableExecutorService(executor); - } - - public int getMinThreadCount() { - return executor.getCorePoolSize(); - } - - public void setMinThreadCount(final int minThreadCount) { - executor.setCorePoolSize(minThreadCount); - } - - @Override - public int getMaxThreadCount() { - return executor.getMaximumPoolSize(); - } - - public void setMaxThreadCount(final int maxThreadCount) { - executor.setMaximumPoolSize(maxThreadCount); - } - - public long getKeepAliveMillis() { - return executor.getKeepAliveTime(TimeUnit.MILLISECONDS); - } - - public void setKeepAliveMillis(final long keepAliveMillis) { - executor.setKeepAliveTime(keepAliveMillis, TimeUnit.MILLISECONDS); - } - - public void setThreadFactory(final ThreadFactory threadFactory) { - executor.setThreadFactory(threadFactory); - } - - public void prestartAllCoreThreads() { - executor.prestartAllCoreThreads(); - } - - @Override - public void close() { - executor.shutdown(); - } - - /** - * if the max threads are met, then it will raise a rejectedExecution. We then push to the queue. - */ - private static final class FlexibleRejectionHandler implements RejectedExecutionHandler { - @Override - @SuppressWarnings("checkstyle:parameterName") - public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) { - try { - executor.getQueue().put(r); - } catch (InterruptedException e) { - throw new RejectedExecutionException("Interrupted while waiting on the queue", e); - } - } - } - - private static final class ForwardingBlockingQueue - extends com.google.common.util.concurrent.ForwardingBlockingQueue { - private final BlockingQueue delegate; - - ForwardingBlockingQueue(final BlockingQueue delegate) { - this.delegate = delegate; - } - - @Override - protected BlockingQueue delegate() { - return delegate; - } - - @Override - @SuppressWarnings("checkstyle:parameterName") - public boolean offer(final Runnable o) { - // ThreadPoolExecutor will spawn a new thread after core size is reached only - // if the queue.offer returns false. - return false; - } - } -}