/* * Copyright 2015-present Open Networking Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package io.atomix.utils.concurrent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Duration; import java.util.LinkedList; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import static com.google.common.base.Preconditions.checkNotNull; /** * Thread pool context. *

* This is a special {@link ThreadContext} implementation that schedules events to be executed * on a thread pool. Events executed by this context are guaranteed to be executed on order but may be executed on different * threads in the provided thread pool. * * @author Jordan Halterman */ public class ThreadPoolContext extends AbstractThreadContext { private static final Logger LOGGER = LoggerFactory.getLogger(ThreadPoolContext.class); protected final ScheduledExecutorService parent; private final Runnable runner; private final LinkedList tasks = new LinkedList<>(); private boolean running; private final Executor executor = new Executor() { @Override public void execute(Runnable command) { synchronized (tasks) { tasks.add(command); if (!running) { running = true; parent.execute(runner); } } } }; /** * Creates a new thread pool context. * * @param parent The thread pool on which to execute events. */ public ThreadPoolContext(ScheduledExecutorService parent) { this.parent = checkNotNull(parent, "parent cannot be null"); // This code was shamelessly stolededed from Vert.x: // https://github.com/eclipse/vert.x/blob/master/src/main/java/io/vertx/core/impl/OrderedExecutorFactory.java runner = () -> { ((AtomixThread) Thread.currentThread()).setContext(this); for (;;) { final Runnable task; synchronized (tasks) { task = tasks.poll(); if (task == null) { running = false; return; } } try { task.run(); } catch (Throwable t) { LOGGER.error("An uncaught exception occurred", t); throw t; } } }; } @Override public void execute(Runnable command) { executor.execute(command); } @Override public Scheduled schedule(Duration delay, Runnable runnable) { ScheduledFuture future = parent.schedule(() -> executor.execute(runnable), delay.toMillis(), TimeUnit.MILLISECONDS); return () -> future.cancel(false); } @Override public Scheduled schedule(Duration delay, Duration interval, Runnable runnable) { ScheduledFuture future = parent.scheduleAtFixedRate(() -> executor.execute(runnable), delay.toMillis(), interval.toMillis(), TimeUnit.MILLISECONDS); return () -> future.cancel(false); } @Override public void close() { // Do nothing. } }