+++ /dev/null
-/*
- * Copyright 2015-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.atomix.utils.concurrent;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Base implementation of an item accumulator. It allows triggering based on
- * item inter-arrival time threshold, maximum batch life threshold and maximum
- * batch size.
- */
-public abstract class AbstractAccumulator<T> implements Accumulator<T> {
-
- private Logger log = LoggerFactory.getLogger(AbstractAccumulator.class);
-
- private final Timer timer;
- private final int maxItems;
- private final int maxBatchMillis;
- private final int maxIdleMillis;
-
- private final AtomicReference<TimerTask> idleTask = new AtomicReference<>();
- private final AtomicReference<TimerTask> maxTask = new AtomicReference<>();
-
- private final List<T> items;
-
- /**
- * Creates an item accumulator capable of triggering on the specified
- * thresholds.
- *
- * @param timer timer to use for scheduling check-points
- * @param maxItems maximum number of items to accumulate before
- * processing is triggered
- * <p>
- * NB: It is possible that processItems will contain
- * more than maxItems under high load or if isReady()
- * can return false.
- * </p>
- * @param maxBatchMillis maximum number of millis allowed since the first
- * item before processing is triggered
- * @param maxIdleMillis maximum number millis between items before
- * processing is triggered
- */
- protected AbstractAccumulator(Timer timer, int maxItems,
- int maxBatchMillis, int maxIdleMillis) {
- this.timer = checkNotNull(timer, "Timer cannot be null");
-
- checkArgument(maxItems > 1, "Maximum number of items must be > 1");
- checkArgument(maxBatchMillis > 0, "Maximum millis must be positive");
- checkArgument(maxIdleMillis > 0, "Maximum idle millis must be positive");
-
- this.maxItems = maxItems;
- this.maxBatchMillis = maxBatchMillis;
- this.maxIdleMillis = maxIdleMillis;
-
- items = Lists.newArrayListWithExpectedSize(maxItems);
- }
-
- @Override
- public void add(T item) {
- final int sizeAtTimeOfAdd;
- synchronized (items) {
- items.add(item);
- sizeAtTimeOfAdd = items.size();
- }
-
- /*
- WARNING: It is possible that the item that was just added to the list
- has been processed by an existing idle task at this point.
-
- By rescheduling the following timers, it is possible that a
- superfluous maxTask is generated now OR that the idle task and max
- task are scheduled at their specified delays. This could result in
- calls to processItems sooner than expected.
- */
-
- // Did we hit the max item threshold?
- if (sizeAtTimeOfAdd >= maxItems) {
- if (maxIdleMillis < maxBatchMillis) {
- cancelTask(idleTask);
- }
- rescheduleTask(maxTask, 0 /* now! */);
- } else {
- // Otherwise, schedule idle task and if this is a first item
- // also schedule the max batch age task.
- if (maxIdleMillis < maxBatchMillis) {
- rescheduleTask(idleTask, maxIdleMillis);
- }
- if (sizeAtTimeOfAdd == 1) {
- rescheduleTask(maxTask, maxBatchMillis);
- }
- }
- }
-
- /**
- * Reschedules the specified task, cancelling existing one if applicable.
- *
- * @param taskRef task reference
- * @param millis delay in milliseconds
- */
- private void rescheduleTask(AtomicReference<TimerTask> taskRef, long millis) {
- ProcessorTask newTask = new ProcessorTask();
- timer.schedule(newTask, millis);
- swapAndCancelTask(taskRef, newTask);
- }
-
- /**
- * Cancels the specified task if it has not run or is not running.
- *
- * @param taskRef task reference
- */
- private void cancelTask(AtomicReference<TimerTask> taskRef) {
- swapAndCancelTask(taskRef, null);
- }
-
- /**
- * Sets the new task and attempts to cancelTask the old one.
- *
- * @param taskRef task reference
- * @param newTask new task
- */
- private void swapAndCancelTask(AtomicReference<TimerTask> taskRef,
- TimerTask newTask) {
- TimerTask oldTask = taskRef.getAndSet(newTask);
- if (oldTask != null) {
- oldTask.cancel();
- }
- }
-
- // Task for triggering processing of accumulated items
- private class ProcessorTask extends TimerTask {
- @Override
- public void run() {
- try {
- if (isReady()) {
-
- List<T> batch = finalizeCurrentBatch();
- if (!batch.isEmpty()) {
- processItems(batch);
- }
- } else {
- rescheduleTask(idleTask, maxIdleMillis);
- }
- } catch (Exception e) {
- log.warn("Unable to process batch due to", e);
- }
- }
- }
-
- /**
- * Returns an immutable copy of the existing items and clear the list.
- *
- * @return list of existing items
- */
- private List<T> finalizeCurrentBatch() {
- List<T> finalizedList;
- synchronized (items) {
- finalizedList = ImmutableList.copyOf(items);
- items.clear();
- /*
- * To avoid reprocessing being triggered on an empty list.
- */
- cancelTask(maxTask);
- cancelTask(idleTask);
- }
- return finalizedList;
- }
-
- @Override
- public boolean isReady() {
- return true;
- }
-
- /**
- * Returns the backing timer.
- *
- * @return backing timer
- */
- public Timer timer() {
- return timer;
- }
-
- /**
- * Returns the maximum number of items allowed to accumulate before
- * processing is triggered.
- *
- * @return max number of items
- */
- public int maxItems() {
- return maxItems;
- }
-
- /**
- * Returns the maximum number of millis allowed to expire since the first
- * item before processing is triggered.
- *
- * @return max number of millis a batch is allowed to last
- */
- public int maxBatchMillis() {
- return maxBatchMillis;
- }
-
- /**
- * Returns the maximum number of millis allowed to expire since the last
- * item arrival before processing is triggered.
- *
- * @return max number of millis since the last item
- */
- public int maxIdleMillis() {
- return maxIdleMillis;
- }
-
-}
+++ /dev/null
-/*
- * Copyright 2018-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.atomix.utils.concurrent;
-
-/**
- * Abstract thread context.
- */
-public abstract class AbstractThreadContext implements ThreadContext {
- private volatile boolean blocked;
-
- @Override
- public boolean isBlocked() {
- return blocked;
- }
-
- @Override
- public void block() {
- blocked = true;
- }
-
- @Override
- public void unblock() {
- blocked = false;
- }
-}
+++ /dev/null
-/*
- * Copyright 2015-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.atomix.utils.concurrent;
-
-import java.util.List;
-
-/**
- * Abstraction of an accumulator capable of collecting items and at some
- * point in time triggers processing of all previously accumulated items.
- *
- * @param <T> item type
- */
-public interface Accumulator<T> {
-
- /**
- * Adds an item to the current batch. This operation may, or may not
- * trigger processing of the current batch of items.
- *
- * @param item item to be added to the current batch
- */
- void add(T item);
-
- /**
- * Processes the specified list of accumulated items.
- *
- * @param items list of accumulated items
- */
- void processItems(List<T> items);
-
- /**
- * Indicates whether the accumulator is ready to process items.
- *
- * @return true if ready to process
- */
- boolean isReady();
-}
+++ /dev/null
-/*
- * Copyright 2018-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.atomix.utils.concurrent;
-
-import java.util.concurrent.Executor;
-import java.util.concurrent.ThreadFactory;
-
-import static io.atomix.utils.concurrent.Threads.namedThreads;
-
-/**
- * Blocking aware single thread context.
- */
-public class BlockingAwareSingleThreadContext extends SingleThreadContext {
- private final Executor threadPoolExecutor;
-
- public BlockingAwareSingleThreadContext(String nameFormat, Executor threadPoolExecutor) {
- this(namedThreads(nameFormat, LOGGER), threadPoolExecutor);
- }
-
- public BlockingAwareSingleThreadContext(ThreadFactory factory, Executor threadPoolExecutor) {
- super(factory);
- this.threadPoolExecutor = threadPoolExecutor;
- }
-
- @Override
- public void execute(Runnable command) {
- if (isBlocked()) {
- threadPoolExecutor.execute(command);
- } else {
- super.execute(command);
- }
- }
-}
+++ /dev/null
-/*
- * Copyright 2017-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.atomix.utils.concurrent;
-
-import org.slf4j.Logger;
-
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static io.atomix.utils.concurrent.Threads.namedThreads;
-
-/**
- * Single thread context factory.
- */
-public class BlockingAwareSingleThreadContextFactory implements ThreadContextFactory {
- private final ThreadFactory threadFactory;
- private final Executor threadPoolExecutor;
-
- public BlockingAwareSingleThreadContextFactory(String nameFormat, int threadPoolSize, Logger logger) {
- this(threadPoolSize, namedThreads(nameFormat, logger));
- }
-
- public BlockingAwareSingleThreadContextFactory(int threadPoolSize, ThreadFactory threadFactory) {
- this(threadFactory, Executors.newScheduledThreadPool(threadPoolSize, threadFactory));
- }
-
- public BlockingAwareSingleThreadContextFactory(ThreadFactory threadFactory, Executor threadPoolExecutor) {
- this.threadFactory = checkNotNull(threadFactory);
- this.threadPoolExecutor = checkNotNull(threadPoolExecutor);
- }
-
- @Override
- public ThreadContext createContext() {
- return new BlockingAwareSingleThreadContext(threadFactory, threadPoolExecutor);
- }
-}
+++ /dev/null
-/*
- * Copyright 2018-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.atomix.utils.concurrent;
-
-import java.util.concurrent.ScheduledExecutorService;
-
-/**
- * Blocking aware thread pool context.
- */
-public class BlockingAwareThreadPoolContext extends ThreadPoolContext {
- public BlockingAwareThreadPoolContext(ScheduledExecutorService parent) {
- super(parent);
- }
-
- @Override
- public void execute(Runnable command) {
- if (isBlocked()) {
- parent.execute(command);
- } else {
- super.execute(command);
- }
- }
-}
+++ /dev/null
-/*
- * Copyright 2017-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.atomix.utils.concurrent;
-
-import org.slf4j.Logger;
-
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-
-import static io.atomix.utils.concurrent.Threads.namedThreads;
-
-/**
- * Thread pool context factory.
- */
-public class BlockingAwareThreadPoolContextFactory implements ThreadContextFactory {
- private final ScheduledExecutorService executor;
-
- public BlockingAwareThreadPoolContextFactory(String name, int threadPoolSize, Logger logger) {
- this(threadPoolSize, namedThreads(name, logger));
- }
-
- public BlockingAwareThreadPoolContextFactory(int threadPoolSize, ThreadFactory threadFactory) {
- this(Executors.newScheduledThreadPool(threadPoolSize, threadFactory));
- }
-
- public BlockingAwareThreadPoolContextFactory(ScheduledExecutorService executor) {
- this.executor = executor;
- }
-
- @Override
- public ThreadContext createContext() {
- return new BlockingAwareThreadPoolContext(executor);
- }
-
- @Override
- public void close() {
- executor.shutdownNow();
- }
-}
+++ /dev/null
-/*
- * Copyright 2017-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.atomix.utils.concurrent;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.time.Duration;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static com.google.common.base.Preconditions.checkState;
-import static io.atomix.utils.concurrent.Threads.namedThreads;
-
-/**
- * Single threaded context.
- * <p>
- * This is a basic {@link ThreadContext} implementation that uses a
- * {@link ScheduledExecutorService} to schedule events on the context thread.
- *
- * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
- */
-public class SingleThreadContext extends AbstractThreadContext {
- protected static final Logger LOGGER = LoggerFactory.getLogger(SingleThreadContext.class);
- private final ScheduledExecutorService executor;
- private final Executor wrappedExecutor = new Executor() {
- @Override
- public void execute(Runnable command) {
- try {
- executor.execute(() -> {
- try {
- command.run();
- } catch (Exception e) {
- LOGGER.error("An uncaught exception occurred", e);
- }
- });
- } catch (RejectedExecutionException e) {
- }
- }
- };
-
- /**
- * Creates a new single thread context.
- * <p>
- * The provided context name will be passed to {@link AtomixThreadFactory} and used
- * when instantiating the context thread.
- *
- * @param nameFormat The context nameFormat which will be formatted with a thread number.
- */
- public SingleThreadContext(String nameFormat) {
- this(namedThreads(nameFormat, LOGGER));
- }
-
- /**
- * Creates a new single thread context.
- *
- * @param factory The thread factory.
- */
- public SingleThreadContext(ThreadFactory factory) {
- this(new ScheduledThreadPoolExecutor(1, factory));
- }
-
- /**
- * Creates a new single thread context.
- *
- * @param executor The executor on which to schedule events. This must be a single thread scheduled executor.
- */
- protected SingleThreadContext(ScheduledExecutorService executor) {
- this(getThread(executor), executor);
- }
-
- private SingleThreadContext(Thread thread, ScheduledExecutorService executor) {
- this.executor = executor;
- checkState(thread instanceof AtomixThread, "not a Catalyst thread");
- ((AtomixThread) thread).setContext(this);
- }
-
- /**
- * Gets the thread from a single threaded executor service.
- */
- protected static AtomixThread getThread(ExecutorService executor) {
- final AtomicReference<AtomixThread> thread = new AtomicReference<>();
- try {
- executor.submit(() -> {
- thread.set((AtomixThread) Thread.currentThread());
- }).get();
- } catch (InterruptedException | ExecutionException e) {
- throw new IllegalStateException("failed to initialize thread state", e);
- }
- return thread.get();
- }
-
- @Override
- public void execute(Runnable command) {
- wrappedExecutor.execute(command);
- }
-
- @Override
- public Scheduled schedule(Duration delay, Runnable runnable) {
- ScheduledFuture<?> future = executor.schedule(runnable, delay.toMillis(), TimeUnit.MILLISECONDS);
- return () -> future.cancel(false);
- }
-
- @Override
- public Scheduled schedule(Duration delay, Duration interval, Runnable runnable) {
- ScheduledFuture<?> future = executor.scheduleAtFixedRate(runnable, delay.toMillis(), interval.toMillis(), TimeUnit.MILLISECONDS);
- return () -> future.cancel(false);
- }
-
- @Override
- public void close() {
- executor.shutdownNow();
- }
-
-}
+++ /dev/null
-/*
- * Copyright 2017-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.atomix.utils.concurrent;
-
-import org.slf4j.Logger;
-
-/**
- * Raft thread model.
- */
-public enum ThreadModel {
-
- /**
- * A thread model that creates a thread pool to be shared by all services.
- */
- SHARED_THREAD_POOL {
- @Override
- public ThreadContextFactory factory(String nameFormat, int threadPoolSize, Logger logger) {
- return new BlockingAwareThreadPoolContextFactory(nameFormat, threadPoolSize, logger);
- }
- },
-
- /**
- * A thread model that creates a thread for each Raft service.
- */
- THREAD_PER_SERVICE {
- @Override
- public ThreadContextFactory factory(String nameFormat, int threadPoolSize, Logger logger) {
- return new BlockingAwareSingleThreadContextFactory(nameFormat, threadPoolSize, logger);
- }
- };
-
- /**
- * Returns a thread context factory.
- *
- * @param nameFormat the thread name format
- * @param threadPoolSize the thread pool size
- * @param logger the thread logger
- * @return the thread context factory
- */
- public abstract ThreadContextFactory factory(String nameFormat, int threadPoolSize, Logger logger);
-}
+++ /dev/null
-/*
- * Copyright 2015-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.atomix.utils.concurrent;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.time.Duration;
-import java.util.LinkedList;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Thread pool context.
- * <p>
- * This is a special {@link ThreadContext} implementation that schedules events to be executed
- * on a thread pool. Events executed by this context are guaranteed to be executed on order but may be executed on different
- * threads in the provided thread pool.
- *
- * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
- */
-public class ThreadPoolContext extends AbstractThreadContext {
- private static final Logger LOGGER = LoggerFactory.getLogger(ThreadPoolContext.class);
- protected final ScheduledExecutorService parent;
- private final Runnable runner;
- private final LinkedList<Runnable> tasks = new LinkedList<>();
- private boolean running;
- private final Executor executor = new Executor() {
- @Override
- public void execute(Runnable command) {
- synchronized (tasks) {
- tasks.add(command);
- if (!running) {
- running = true;
- parent.execute(runner);
- }
- }
- }
- };
-
- /**
- * Creates a new thread pool context.
- *
- * @param parent The thread pool on which to execute events.
- */
- public ThreadPoolContext(ScheduledExecutorService parent) {
- this.parent = checkNotNull(parent, "parent cannot be null");
-
- // This code was shamelessly stolededed from Vert.x:
- // https://github.com/eclipse/vert.x/blob/master/src/main/java/io/vertx/core/impl/OrderedExecutorFactory.java
- runner = () -> {
- ((AtomixThread) Thread.currentThread()).setContext(this);
- for (;;) {
- final Runnable task;
- synchronized (tasks) {
- task = tasks.poll();
- if (task == null) {
- running = false;
- return;
- }
- }
-
- try {
- task.run();
- } catch (Throwable t) {
- LOGGER.error("An uncaught exception occurred", t);
- throw t;
- }
- }
- };
- }
-
- @Override
- public void execute(Runnable command) {
- executor.execute(command);
- }
-
- @Override
- public Scheduled schedule(Duration delay, Runnable runnable) {
- ScheduledFuture<?> future = parent.schedule(() -> executor.execute(runnable), delay.toMillis(), TimeUnit.MILLISECONDS);
- return () -> future.cancel(false);
- }
-
- @Override
- public Scheduled schedule(Duration delay, Duration interval, Runnable runnable) {
- ScheduledFuture<?> future = parent.scheduleAtFixedRate(() -> executor.execute(runnable), delay.toMillis(), interval.toMillis(), TimeUnit.MILLISECONDS);
- return () -> future.cancel(false);
- }
-
- @Override
- public void close() {
- // Do nothing.
- }
-
-}
+++ /dev/null
-/*
- * Copyright 2015-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.atomix.utils.misc;
-
-import io.atomix.utils.concurrent.Scheduled;
-import io.atomix.utils.concurrent.SingleThreadContext;
-import io.atomix.utils.concurrent.ThreadContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Collectors;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-/**
- * Maintains a sliding window of value counts. The sliding window counter is
- * initialized with a number of window slots. Calls to #incrementCount() will
- * increment the value in the current window slot. Periodically the window
- * slides and the oldest value count is dropped. Calls to #get() will get the
- * total count for the last N window slots.
- */
-public final class SlidingWindowCounter {
- private final Logger log = LoggerFactory.getLogger(getClass());
- private volatile int headSlot;
- private final int windowSlots;
-
- private final List<AtomicLong> counters;
-
- private final Scheduled schedule;
-
- private static final int SLIDE_WINDOW_PERIOD_SECONDS = 1;
-
- public SlidingWindowCounter(int windowSlots) {
- this(windowSlots, new SingleThreadContext("sliding-window-counter-%d"));
- }
-
- /**
- * Creates a new sliding window counter with the given total number of
- * window slots.
- *
- * @param windowSlots total number of window slots
- */
- public SlidingWindowCounter(int windowSlots, ThreadContext context) {
- checkArgument(windowSlots > 0, "Window size must be a positive integer");
-
- this.windowSlots = windowSlots;
- this.headSlot = 0;
-
- // Initialize each item in the list to an AtomicLong of 0
- this.counters = Collections.nCopies(windowSlots, 0)
- .stream()
- .map(AtomicLong::new)
- .collect(Collectors.toCollection(ArrayList::new));
- this.schedule = context.schedule(0, SLIDE_WINDOW_PERIOD_SECONDS, TimeUnit.SECONDS, this::advanceHead);
- }
-
- /**
- * Releases resources used by the SlidingWindowCounter.
- */
- public void destroy() {
- schedule.cancel();
- }
-
- /**
- * Increments the count of the current window slot by 1.
- */
- public void incrementCount() {
- incrementCount(headSlot, 1);
- }
-
- /**
- * Increments the count of the current window slot by the given value.
- *
- * @param value value to increment by
- */
- public void incrementCount(long value) {
- incrementCount(headSlot, value);
- }
-
- private void incrementCount(int slot, long value) {
- counters.get(slot).addAndGet(value);
- }
-
- /**
- * Gets the total count for the last N window slots.
- *
- * @param slots number of slots to include in the count
- * @return total count for last N slots
- */
- public long get(int slots) {
- checkArgument(slots <= windowSlots,
- "Requested window must be less than the total window slots");
-
- long sum = 0;
-
- for (int i = 0; i < slots; i++) {
- int currentIndex = headSlot - i;
- if (currentIndex < 0) {
- currentIndex = counters.size() + currentIndex;
- }
- sum += counters.get(currentIndex).get();
- }
-
- return sum;
- }
-
- void advanceHead() {
- counters.get(slotAfter(headSlot)).set(0);
- headSlot = slotAfter(headSlot);
- }
-
- private int slotAfter(int slot) {
- return (slot + 1) % windowSlots;
- }
-
-}