2 * Copyright 2017-present Open Networking Foundation
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package io.atomix.utils.concurrent;
18 import org.slf4j.Logger;
19 import org.slf4j.LoggerFactory;
21 import java.time.Duration;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.Executor;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.RejectedExecutionException;
26 import java.util.concurrent.ScheduledExecutorService;
27 import java.util.concurrent.ScheduledFuture;
28 import java.util.concurrent.ScheduledThreadPoolExecutor;
29 import java.util.concurrent.ThreadFactory;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.atomic.AtomicReference;
33 import static com.google.common.base.Preconditions.checkState;
34 import static io.atomix.utils.concurrent.Threads.namedThreads;
37 * Single threaded context.
39 * This is a basic {@link ThreadContext} implementation that uses a
40 * {@link ScheduledExecutorService} to schedule events on the context thread.
42 * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
44 public class SingleThreadContext extends AbstractThreadContext {
45 protected static final Logger LOGGER = LoggerFactory.getLogger(SingleThreadContext.class);
46 private final ScheduledExecutorService executor;
47 private final Executor wrappedExecutor = new Executor() {
49 public void execute(Runnable command) {
51 executor.execute(() -> {
54 } catch (Exception e) {
55 LOGGER.error("An uncaught exception occurred", e);
58 } catch (RejectedExecutionException e) {
64 * Creates a new single thread context.
66 * The provided context name will be passed to {@link AtomixThreadFactory} and used
67 * when instantiating the context thread.
69 * @param nameFormat The context nameFormat which will be formatted with a thread number.
71 public SingleThreadContext(String nameFormat) {
72 this(namedThreads(nameFormat, LOGGER));
76 * Creates a new single thread context.
78 * @param factory The thread factory.
80 public SingleThreadContext(ThreadFactory factory) {
81 this(new ScheduledThreadPoolExecutor(1, factory));
85 * Creates a new single thread context.
87 * @param executor The executor on which to schedule events. This must be a single thread scheduled executor.
89 protected SingleThreadContext(ScheduledExecutorService executor) {
90 this(getThread(executor), executor);
93 private SingleThreadContext(Thread thread, ScheduledExecutorService executor) {
94 this.executor = executor;
95 checkState(thread instanceof AtomixThread, "not a Catalyst thread");
96 ((AtomixThread) thread).setContext(this);
100 * Gets the thread from a single threaded executor service.
102 protected static AtomixThread getThread(ExecutorService executor) {
103 final AtomicReference<AtomixThread> thread = new AtomicReference<>();
105 executor.submit(() -> {
106 thread.set((AtomixThread) Thread.currentThread());
108 } catch (InterruptedException | ExecutionException e) {
109 throw new IllegalStateException("failed to initialize thread state", e);
115 public void execute(Runnable command) {
116 wrappedExecutor.execute(command);
120 public Scheduled schedule(Duration delay, Runnable runnable) {
121 ScheduledFuture<?> future = executor.schedule(runnable, delay.toMillis(), TimeUnit.MILLISECONDS);
122 return () -> future.cancel(false);
126 public Scheduled schedule(Duration delay, Duration interval, Runnable runnable) {
127 ScheduledFuture<?> future = executor.scheduleAtFixedRate(runnable, delay.toMillis(), interval.toMillis(), TimeUnit.MILLISECONDS);
128 return () -> future.cancel(false);
132 public void close() {
133 executor.shutdownNow();