Import atomix/{storage,utils}
[controller.git] / third-party / atomix / utils / src / main / java / io / atomix / utils / concurrent / SingleThreadContext.java
1 /*
2  * Copyright 2017-present Open Networking Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.utils.concurrent;
17
18 import org.slf4j.Logger;
19 import org.slf4j.LoggerFactory;
20
21 import java.time.Duration;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.Executor;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.RejectedExecutionException;
26 import java.util.concurrent.ScheduledExecutorService;
27 import java.util.concurrent.ScheduledFuture;
28 import java.util.concurrent.ScheduledThreadPoolExecutor;
29 import java.util.concurrent.ThreadFactory;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.atomic.AtomicReference;
32
33 import static com.google.common.base.Preconditions.checkState;
34 import static io.atomix.utils.concurrent.Threads.namedThreads;
35
36 /**
37  * Single threaded context.
38  * <p>
39  * This is a basic {@link ThreadContext} implementation that uses a
40  * {@link ScheduledExecutorService} to schedule events on the context thread.
41  *
42  * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
43  */
44 public class SingleThreadContext extends AbstractThreadContext {
45   protected static final Logger LOGGER = LoggerFactory.getLogger(SingleThreadContext.class);
46   private final ScheduledExecutorService executor;
47   private final Executor wrappedExecutor = new Executor() {
48     @Override
49     public void execute(Runnable command) {
50       try {
51         executor.execute(() -> {
52           try {
53             command.run();
54           } catch (Exception e) {
55             LOGGER.error("An uncaught exception occurred", e);
56           }
57         });
58       } catch (RejectedExecutionException e) {
59       }
60     }
61   };
62
63   /**
64    * Creates a new single thread context.
65    * <p>
66    * The provided context name will be passed to {@link AtomixThreadFactory} and used
67    * when instantiating the context thread.
68    *
69    * @param nameFormat The context nameFormat which will be formatted with a thread number.
70    */
71   public SingleThreadContext(String nameFormat) {
72     this(namedThreads(nameFormat, LOGGER));
73   }
74
75   /**
76    * Creates a new single thread context.
77    *
78    * @param factory The thread factory.
79    */
80   public SingleThreadContext(ThreadFactory factory) {
81     this(new ScheduledThreadPoolExecutor(1, factory));
82   }
83
84   /**
85    * Creates a new single thread context.
86    *
87    * @param executor The executor on which to schedule events. This must be a single thread scheduled executor.
88    */
89   protected SingleThreadContext(ScheduledExecutorService executor) {
90     this(getThread(executor), executor);
91   }
92
93   private SingleThreadContext(Thread thread, ScheduledExecutorService executor) {
94     this.executor = executor;
95     checkState(thread instanceof AtomixThread, "not a Catalyst thread");
96     ((AtomixThread) thread).setContext(this);
97   }
98
99   /**
100    * Gets the thread from a single threaded executor service.
101    */
102   protected static AtomixThread getThread(ExecutorService executor) {
103     final AtomicReference<AtomixThread> thread = new AtomicReference<>();
104     try {
105       executor.submit(() -> {
106         thread.set((AtomixThread) Thread.currentThread());
107       }).get();
108     } catch (InterruptedException | ExecutionException e) {
109       throw new IllegalStateException("failed to initialize thread state", e);
110     }
111     return thread.get();
112   }
113
114   @Override
115   public void execute(Runnable command) {
116     wrappedExecutor.execute(command);
117   }
118
119   @Override
120   public Scheduled schedule(Duration delay, Runnable runnable) {
121     ScheduledFuture<?> future = executor.schedule(runnable, delay.toMillis(), TimeUnit.MILLISECONDS);
122     return () -> future.cancel(false);
123   }
124
125   @Override
126   public Scheduled schedule(Duration delay, Duration interval, Runnable runnable) {
127     ScheduledFuture<?> future = executor.scheduleAtFixedRate(runnable, delay.toMillis(), interval.toMillis(), TimeUnit.MILLISECONDS);
128     return () -> future.cancel(false);
129   }
130
131   @Override
132   public void close() {
133     executor.shutdownNow();
134   }
135
136 }