Remove Accumulator and its dependencies 74/104674/2
authorRobert Varga <robert.varga@pantheon.tech>
Thu, 2 Mar 2023 00:11:44 +0000 (01:11 +0100)
committerRobert Varga <robert.varga@pantheon.tech>
Thu, 2 Mar 2023 00:13:43 +0000 (01:13 +0100)
We are not going to use this class, remove it along with everything that
(transitively) needs it.

JIRA: CONTROLLER-2071
Change-Id: I07e0b8143d54068e9ac6d1e8060279086ff79e22
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
third-party/atomix/utils/src/main/java/io/atomix/utils/concurrent/AbstractAccumulator.java [deleted file]
third-party/atomix/utils/src/main/java/io/atomix/utils/concurrent/AbstractThreadContext.java [deleted file]
third-party/atomix/utils/src/main/java/io/atomix/utils/concurrent/Accumulator.java [deleted file]
third-party/atomix/utils/src/main/java/io/atomix/utils/concurrent/BlockingAwareSingleThreadContext.java [deleted file]
third-party/atomix/utils/src/main/java/io/atomix/utils/concurrent/BlockingAwareSingleThreadContextFactory.java [deleted file]
third-party/atomix/utils/src/main/java/io/atomix/utils/concurrent/BlockingAwareThreadPoolContext.java [deleted file]
third-party/atomix/utils/src/main/java/io/atomix/utils/concurrent/BlockingAwareThreadPoolContextFactory.java [deleted file]
third-party/atomix/utils/src/main/java/io/atomix/utils/concurrent/SingleThreadContext.java [deleted file]
third-party/atomix/utils/src/main/java/io/atomix/utils/concurrent/ThreadModel.java [deleted file]
third-party/atomix/utils/src/main/java/io/atomix/utils/concurrent/ThreadPoolContext.java [deleted file]
third-party/atomix/utils/src/main/java/io/atomix/utils/misc/SlidingWindowCounter.java [deleted file]

diff --git a/third-party/atomix/utils/src/main/java/io/atomix/utils/concurrent/AbstractAccumulator.java b/third-party/atomix/utils/src/main/java/io/atomix/utils/concurrent/AbstractAccumulator.java
deleted file mode 100644 (file)
index 9d8b1bc..0000000
+++ /dev/null
@@ -1,236 +0,0 @@
-/*
- * Copyright 2015-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.atomix.utils.concurrent;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Base implementation of an item accumulator. It allows triggering based on
- * item inter-arrival time threshold, maximum batch life threshold and maximum
- * batch size.
- */
-public abstract class AbstractAccumulator<T> implements Accumulator<T> {
-
-  private Logger log = LoggerFactory.getLogger(AbstractAccumulator.class);
-
-  private final Timer timer;
-  private final int maxItems;
-  private final int maxBatchMillis;
-  private final int maxIdleMillis;
-
-  private final AtomicReference<TimerTask> idleTask = new AtomicReference<>();
-  private final AtomicReference<TimerTask> maxTask = new AtomicReference<>();
-
-  private final List<T> items;
-
-  /**
-   * Creates an item accumulator capable of triggering on the specified
-   * thresholds.
-   *
-   * @param timer          timer to use for scheduling check-points
-   * @param maxItems       maximum number of items to accumulate before
-   *                       processing is triggered
-   *                       <p>
-   *                       NB: It is possible that processItems will contain
-   *                       more than maxItems under high load or if isReady()
-   *                       can return false.
-   *                       </p>
-   * @param maxBatchMillis maximum number of millis allowed since the first
-   *                       item before processing is triggered
-   * @param maxIdleMillis  maximum number millis between items before
-   *                       processing is triggered
-   */
-  protected AbstractAccumulator(Timer timer, int maxItems,
-                                int maxBatchMillis, int maxIdleMillis) {
-    this.timer = checkNotNull(timer, "Timer cannot be null");
-
-    checkArgument(maxItems > 1, "Maximum number of items must be > 1");
-    checkArgument(maxBatchMillis > 0, "Maximum millis must be positive");
-    checkArgument(maxIdleMillis > 0, "Maximum idle millis must be positive");
-
-    this.maxItems = maxItems;
-    this.maxBatchMillis = maxBatchMillis;
-    this.maxIdleMillis = maxIdleMillis;
-
-    items = Lists.newArrayListWithExpectedSize(maxItems);
-  }
-
-  @Override
-  public void add(T item) {
-    final int sizeAtTimeOfAdd;
-    synchronized (items) {
-      items.add(item);
-      sizeAtTimeOfAdd = items.size();
-    }
-
-        /*
-            WARNING: It is possible that the item that was just added to the list
-            has been processed by an existing idle task at this point.
-
-            By rescheduling the following timers, it is possible that a
-            superfluous maxTask is generated now OR that the idle task and max
-            task are scheduled at their specified delays. This could result in
-            calls to processItems sooner than expected.
-         */
-
-    // Did we hit the max item threshold?
-    if (sizeAtTimeOfAdd >= maxItems) {
-      if (maxIdleMillis < maxBatchMillis) {
-        cancelTask(idleTask);
-      }
-      rescheduleTask(maxTask, 0 /* now! */);
-    } else {
-      // Otherwise, schedule idle task and if this is a first item
-      // also schedule the max batch age task.
-      if (maxIdleMillis < maxBatchMillis) {
-        rescheduleTask(idleTask, maxIdleMillis);
-      }
-      if (sizeAtTimeOfAdd == 1) {
-        rescheduleTask(maxTask, maxBatchMillis);
-      }
-    }
-  }
-
-  /**
-   * Reschedules the specified task, cancelling existing one if applicable.
-   *
-   * @param taskRef task reference
-   * @param millis  delay in milliseconds
-   */
-  private void rescheduleTask(AtomicReference<TimerTask> taskRef, long millis) {
-    ProcessorTask newTask = new ProcessorTask();
-    timer.schedule(newTask, millis);
-    swapAndCancelTask(taskRef, newTask);
-  }
-
-  /**
-   * Cancels the specified task if it has not run or is not running.
-   *
-   * @param taskRef task reference
-   */
-  private void cancelTask(AtomicReference<TimerTask> taskRef) {
-    swapAndCancelTask(taskRef, null);
-  }
-
-  /**
-   * Sets the new task and attempts to cancelTask the old one.
-   *
-   * @param taskRef task reference
-   * @param newTask new task
-   */
-  private void swapAndCancelTask(AtomicReference<TimerTask> taskRef,
-                                 TimerTask newTask) {
-    TimerTask oldTask = taskRef.getAndSet(newTask);
-    if (oldTask != null) {
-      oldTask.cancel();
-    }
-  }
-
-  // Task for triggering processing of accumulated items
-  private class ProcessorTask extends TimerTask {
-    @Override
-    public void run() {
-      try {
-        if (isReady()) {
-
-          List<T> batch = finalizeCurrentBatch();
-          if (!batch.isEmpty()) {
-            processItems(batch);
-          }
-        } else {
-          rescheduleTask(idleTask, maxIdleMillis);
-        }
-      } catch (Exception e) {
-        log.warn("Unable to process batch due to", e);
-      }
-    }
-  }
-
-  /**
-   * Returns an immutable copy of the existing items and clear the list.
-   *
-   * @return list of existing items
-   */
-  private List<T> finalizeCurrentBatch() {
-    List<T> finalizedList;
-    synchronized (items) {
-      finalizedList = ImmutableList.copyOf(items);
-      items.clear();
-            /*
-             * To avoid reprocessing being triggered on an empty list.
-             */
-      cancelTask(maxTask);
-      cancelTask(idleTask);
-    }
-    return finalizedList;
-  }
-
-  @Override
-  public boolean isReady() {
-    return true;
-  }
-
-  /**
-   * Returns the backing timer.
-   *
-   * @return backing timer
-   */
-  public Timer timer() {
-    return timer;
-  }
-
-  /**
-   * Returns the maximum number of items allowed to accumulate before
-   * processing is triggered.
-   *
-   * @return max number of items
-   */
-  public int maxItems() {
-    return maxItems;
-  }
-
-  /**
-   * Returns the maximum number of millis allowed to expire since the first
-   * item before processing is triggered.
-   *
-   * @return max number of millis a batch is allowed to last
-   */
-  public int maxBatchMillis() {
-    return maxBatchMillis;
-  }
-
-  /**
-   * Returns the maximum number of millis allowed to expire since the last
-   * item arrival before processing is triggered.
-   *
-   * @return max number of millis since the last item
-   */
-  public int maxIdleMillis() {
-    return maxIdleMillis;
-  }
-
-}
diff --git a/third-party/atomix/utils/src/main/java/io/atomix/utils/concurrent/AbstractThreadContext.java b/third-party/atomix/utils/src/main/java/io/atomix/utils/concurrent/AbstractThreadContext.java
deleted file mode 100644 (file)
index 41846e2..0000000
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.atomix.utils.concurrent;
-
-/**
- * Abstract thread context.
- */
-public abstract class AbstractThreadContext implements ThreadContext {
-  private volatile boolean blocked;
-
-  @Override
-  public boolean isBlocked() {
-    return blocked;
-  }
-
-  @Override
-  public void block() {
-    blocked = true;
-  }
-
-  @Override
-  public void unblock() {
-    blocked = false;
-  }
-}
diff --git a/third-party/atomix/utils/src/main/java/io/atomix/utils/concurrent/Accumulator.java b/third-party/atomix/utils/src/main/java/io/atomix/utils/concurrent/Accumulator.java
deleted file mode 100644 (file)
index e949e90..0000000
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Copyright 2015-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.atomix.utils.concurrent;
-
-import java.util.List;
-
-/**
- * Abstraction of an accumulator capable of collecting items and at some
- * point in time triggers processing of all previously accumulated items.
- *
- * @param <T> item type
- */
-public interface Accumulator<T> {
-
-  /**
-   * Adds an item to the current batch. This operation may, or may not
-   * trigger processing of the current batch of items.
-   *
-   * @param item item to be added to the current batch
-   */
-  void add(T item);
-
-  /**
-   * Processes the specified list of accumulated items.
-   *
-   * @param items list of accumulated items
-   */
-  void processItems(List<T> items);
-
-  /**
-   * Indicates whether the accumulator is ready to process items.
-   *
-   * @return true if ready to process
-   */
-  boolean isReady();
-}
diff --git a/third-party/atomix/utils/src/main/java/io/atomix/utils/concurrent/BlockingAwareSingleThreadContext.java b/third-party/atomix/utils/src/main/java/io/atomix/utils/concurrent/BlockingAwareSingleThreadContext.java
deleted file mode 100644 (file)
index 3eed503..0000000
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.atomix.utils.concurrent;
-
-import java.util.concurrent.Executor;
-import java.util.concurrent.ThreadFactory;
-
-import static io.atomix.utils.concurrent.Threads.namedThreads;
-
-/**
- * Blocking aware single thread context.
- */
-public class BlockingAwareSingleThreadContext extends SingleThreadContext {
-  private final Executor threadPoolExecutor;
-
-  public BlockingAwareSingleThreadContext(String nameFormat, Executor threadPoolExecutor) {
-    this(namedThreads(nameFormat, LOGGER), threadPoolExecutor);
-  }
-
-  public BlockingAwareSingleThreadContext(ThreadFactory factory, Executor threadPoolExecutor) {
-    super(factory);
-    this.threadPoolExecutor = threadPoolExecutor;
-  }
-
-  @Override
-  public void execute(Runnable command) {
-    if (isBlocked()) {
-      threadPoolExecutor.execute(command);
-    } else {
-      super.execute(command);
-    }
-  }
-}
diff --git a/third-party/atomix/utils/src/main/java/io/atomix/utils/concurrent/BlockingAwareSingleThreadContextFactory.java b/third-party/atomix/utils/src/main/java/io/atomix/utils/concurrent/BlockingAwareSingleThreadContextFactory.java
deleted file mode 100644 (file)
index 894fd14..0000000
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.atomix.utils.concurrent;
-
-import org.slf4j.Logger;
-
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static io.atomix.utils.concurrent.Threads.namedThreads;
-
-/**
- * Single thread context factory.
- */
-public class BlockingAwareSingleThreadContextFactory implements ThreadContextFactory {
-  private final ThreadFactory threadFactory;
-  private final Executor threadPoolExecutor;
-
-  public BlockingAwareSingleThreadContextFactory(String nameFormat, int threadPoolSize, Logger logger) {
-    this(threadPoolSize, namedThreads(nameFormat, logger));
-  }
-
-  public BlockingAwareSingleThreadContextFactory(int threadPoolSize, ThreadFactory threadFactory) {
-    this(threadFactory, Executors.newScheduledThreadPool(threadPoolSize, threadFactory));
-  }
-
-  public BlockingAwareSingleThreadContextFactory(ThreadFactory threadFactory, Executor threadPoolExecutor) {
-    this.threadFactory = checkNotNull(threadFactory);
-    this.threadPoolExecutor = checkNotNull(threadPoolExecutor);
-  }
-
-  @Override
-  public ThreadContext createContext() {
-    return new BlockingAwareSingleThreadContext(threadFactory, threadPoolExecutor);
-  }
-}
diff --git a/third-party/atomix/utils/src/main/java/io/atomix/utils/concurrent/BlockingAwareThreadPoolContext.java b/third-party/atomix/utils/src/main/java/io/atomix/utils/concurrent/BlockingAwareThreadPoolContext.java
deleted file mode 100644 (file)
index 3e88f18..0000000
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.atomix.utils.concurrent;
-
-import java.util.concurrent.ScheduledExecutorService;
-
-/**
- * Blocking aware thread pool context.
- */
-public class BlockingAwareThreadPoolContext extends ThreadPoolContext {
-  public BlockingAwareThreadPoolContext(ScheduledExecutorService parent) {
-    super(parent);
-  }
-
-  @Override
-  public void execute(Runnable command) {
-    if (isBlocked()) {
-      parent.execute(command);
-    } else {
-      super.execute(command);
-    }
-  }
-}
diff --git a/third-party/atomix/utils/src/main/java/io/atomix/utils/concurrent/BlockingAwareThreadPoolContextFactory.java b/third-party/atomix/utils/src/main/java/io/atomix/utils/concurrent/BlockingAwareThreadPoolContextFactory.java
deleted file mode 100644 (file)
index d69a4dd..0000000
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.atomix.utils.concurrent;
-
-import org.slf4j.Logger;
-
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-
-import static io.atomix.utils.concurrent.Threads.namedThreads;
-
-/**
- * Thread pool context factory.
- */
-public class BlockingAwareThreadPoolContextFactory implements ThreadContextFactory {
-  private final ScheduledExecutorService executor;
-
-  public BlockingAwareThreadPoolContextFactory(String name, int threadPoolSize, Logger logger) {
-    this(threadPoolSize, namedThreads(name, logger));
-  }
-
-  public BlockingAwareThreadPoolContextFactory(int threadPoolSize, ThreadFactory threadFactory) {
-    this(Executors.newScheduledThreadPool(threadPoolSize, threadFactory));
-  }
-
-  public BlockingAwareThreadPoolContextFactory(ScheduledExecutorService executor) {
-    this.executor = executor;
-  }
-
-  @Override
-  public ThreadContext createContext() {
-    return new BlockingAwareThreadPoolContext(executor);
-  }
-
-  @Override
-  public void close() {
-    executor.shutdownNow();
-  }
-}
diff --git a/third-party/atomix/utils/src/main/java/io/atomix/utils/concurrent/SingleThreadContext.java b/third-party/atomix/utils/src/main/java/io/atomix/utils/concurrent/SingleThreadContext.java
deleted file mode 100644 (file)
index 95ad86d..0000000
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.atomix.utils.concurrent;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.time.Duration;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static com.google.common.base.Preconditions.checkState;
-import static io.atomix.utils.concurrent.Threads.namedThreads;
-
-/**
- * Single threaded context.
- * <p>
- * This is a basic {@link ThreadContext} implementation that uses a
- * {@link ScheduledExecutorService} to schedule events on the context thread.
- *
- * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
- */
-public class SingleThreadContext extends AbstractThreadContext {
-  protected static final Logger LOGGER = LoggerFactory.getLogger(SingleThreadContext.class);
-  private final ScheduledExecutorService executor;
-  private final Executor wrappedExecutor = new Executor() {
-    @Override
-    public void execute(Runnable command) {
-      try {
-        executor.execute(() -> {
-          try {
-            command.run();
-          } catch (Exception e) {
-            LOGGER.error("An uncaught exception occurred", e);
-          }
-        });
-      } catch (RejectedExecutionException e) {
-      }
-    }
-  };
-
-  /**
-   * Creates a new single thread context.
-   * <p>
-   * The provided context name will be passed to {@link AtomixThreadFactory} and used
-   * when instantiating the context thread.
-   *
-   * @param nameFormat The context nameFormat which will be formatted with a thread number.
-   */
-  public SingleThreadContext(String nameFormat) {
-    this(namedThreads(nameFormat, LOGGER));
-  }
-
-  /**
-   * Creates a new single thread context.
-   *
-   * @param factory The thread factory.
-   */
-  public SingleThreadContext(ThreadFactory factory) {
-    this(new ScheduledThreadPoolExecutor(1, factory));
-  }
-
-  /**
-   * Creates a new single thread context.
-   *
-   * @param executor The executor on which to schedule events. This must be a single thread scheduled executor.
-   */
-  protected SingleThreadContext(ScheduledExecutorService executor) {
-    this(getThread(executor), executor);
-  }
-
-  private SingleThreadContext(Thread thread, ScheduledExecutorService executor) {
-    this.executor = executor;
-    checkState(thread instanceof AtomixThread, "not a Catalyst thread");
-    ((AtomixThread) thread).setContext(this);
-  }
-
-  /**
-   * Gets the thread from a single threaded executor service.
-   */
-  protected static AtomixThread getThread(ExecutorService executor) {
-    final AtomicReference<AtomixThread> thread = new AtomicReference<>();
-    try {
-      executor.submit(() -> {
-        thread.set((AtomixThread) Thread.currentThread());
-      }).get();
-    } catch (InterruptedException | ExecutionException e) {
-      throw new IllegalStateException("failed to initialize thread state", e);
-    }
-    return thread.get();
-  }
-
-  @Override
-  public void execute(Runnable command) {
-    wrappedExecutor.execute(command);
-  }
-
-  @Override
-  public Scheduled schedule(Duration delay, Runnable runnable) {
-    ScheduledFuture<?> future = executor.schedule(runnable, delay.toMillis(), TimeUnit.MILLISECONDS);
-    return () -> future.cancel(false);
-  }
-
-  @Override
-  public Scheduled schedule(Duration delay, Duration interval, Runnable runnable) {
-    ScheduledFuture<?> future = executor.scheduleAtFixedRate(runnable, delay.toMillis(), interval.toMillis(), TimeUnit.MILLISECONDS);
-    return () -> future.cancel(false);
-  }
-
-  @Override
-  public void close() {
-    executor.shutdownNow();
-  }
-
-}
diff --git a/third-party/atomix/utils/src/main/java/io/atomix/utils/concurrent/ThreadModel.java b/third-party/atomix/utils/src/main/java/io/atomix/utils/concurrent/ThreadModel.java
deleted file mode 100644 (file)
index dbbf58c..0000000
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.atomix.utils.concurrent;
-
-import org.slf4j.Logger;
-
-/**
- * Raft thread model.
- */
-public enum ThreadModel {
-
-  /**
-   * A thread model that creates a thread pool to be shared by all services.
-   */
-  SHARED_THREAD_POOL {
-    @Override
-    public ThreadContextFactory factory(String nameFormat, int threadPoolSize, Logger logger) {
-      return new BlockingAwareThreadPoolContextFactory(nameFormat, threadPoolSize, logger);
-    }
-  },
-
-  /**
-   * A thread model that creates a thread for each Raft service.
-   */
-  THREAD_PER_SERVICE {
-    @Override
-    public ThreadContextFactory factory(String nameFormat, int threadPoolSize, Logger logger) {
-      return new BlockingAwareSingleThreadContextFactory(nameFormat, threadPoolSize, logger);
-    }
-  };
-
-  /**
-   * Returns a thread context factory.
-   *
-   * @param nameFormat the thread name format
-   * @param threadPoolSize the thread pool size
-   * @param logger the thread logger
-   * @return the thread context factory
-   */
-  public abstract ThreadContextFactory factory(String nameFormat, int threadPoolSize, Logger logger);
-}
diff --git a/third-party/atomix/utils/src/main/java/io/atomix/utils/concurrent/ThreadPoolContext.java b/third-party/atomix/utils/src/main/java/io/atomix/utils/concurrent/ThreadPoolContext.java
deleted file mode 100644 (file)
index 1e3c6f8..0000000
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Copyright 2015-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.atomix.utils.concurrent;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.time.Duration;
-import java.util.LinkedList;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Thread pool context.
- * <p>
- * This is a special {@link ThreadContext} implementation that schedules events to be executed
- * on a thread pool. Events executed by this context are guaranteed to be executed on order but may be executed on different
- * threads in the provided thread pool.
- *
- * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
- */
-public class ThreadPoolContext extends AbstractThreadContext {
-  private static final Logger LOGGER = LoggerFactory.getLogger(ThreadPoolContext.class);
-  protected final ScheduledExecutorService parent;
-  private final Runnable runner;
-  private final LinkedList<Runnable> tasks = new LinkedList<>();
-  private boolean running;
-  private final Executor executor = new Executor() {
-    @Override
-    public void execute(Runnable command) {
-      synchronized (tasks) {
-        tasks.add(command);
-        if (!running) {
-          running = true;
-          parent.execute(runner);
-        }
-      }
-    }
-  };
-
-  /**
-   * Creates a new thread pool context.
-   *
-   * @param parent The thread pool on which to execute events.
-   */
-  public ThreadPoolContext(ScheduledExecutorService parent) {
-    this.parent = checkNotNull(parent, "parent cannot be null");
-
-    // This code was shamelessly stolededed from Vert.x:
-    // https://github.com/eclipse/vert.x/blob/master/src/main/java/io/vertx/core/impl/OrderedExecutorFactory.java
-    runner = () -> {
-      ((AtomixThread) Thread.currentThread()).setContext(this);
-      for (;;) {
-        final Runnable task;
-        synchronized (tasks) {
-          task = tasks.poll();
-          if (task == null) {
-            running = false;
-            return;
-          }
-        }
-
-        try {
-          task.run();
-        } catch (Throwable t) {
-          LOGGER.error("An uncaught exception occurred", t);
-          throw t;
-        }
-      }
-    };
-  }
-
-  @Override
-  public void execute(Runnable command) {
-    executor.execute(command);
-  }
-
-  @Override
-  public Scheduled schedule(Duration delay, Runnable runnable) {
-    ScheduledFuture<?> future = parent.schedule(() -> executor.execute(runnable), delay.toMillis(), TimeUnit.MILLISECONDS);
-    return () -> future.cancel(false);
-  }
-
-  @Override
-  public Scheduled schedule(Duration delay, Duration interval, Runnable runnable) {
-    ScheduledFuture<?> future = parent.scheduleAtFixedRate(() -> executor.execute(runnable), delay.toMillis(), interval.toMillis(), TimeUnit.MILLISECONDS);
-    return () -> future.cancel(false);
-  }
-
-  @Override
-  public void close() {
-    // Do nothing.
-  }
-
-}
diff --git a/third-party/atomix/utils/src/main/java/io/atomix/utils/misc/SlidingWindowCounter.java b/third-party/atomix/utils/src/main/java/io/atomix/utils/misc/SlidingWindowCounter.java
deleted file mode 100644 (file)
index 154586b..0000000
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Copyright 2015-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.atomix.utils.misc;
-
-import io.atomix.utils.concurrent.Scheduled;
-import io.atomix.utils.concurrent.SingleThreadContext;
-import io.atomix.utils.concurrent.ThreadContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Collectors;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-/**
- * Maintains a sliding window of value counts. The sliding window counter is
- * initialized with a number of window slots. Calls to #incrementCount() will
- * increment the value in the current window slot. Periodically the window
- * slides and the oldest value count is dropped. Calls to #get() will get the
- * total count for the last N window slots.
- */
-public final class SlidingWindowCounter {
-  private final Logger log = LoggerFactory.getLogger(getClass());
-  private volatile int headSlot;
-  private final int windowSlots;
-
-  private final List<AtomicLong> counters;
-
-  private final Scheduled schedule;
-
-  private static final int SLIDE_WINDOW_PERIOD_SECONDS = 1;
-
-  public SlidingWindowCounter(int windowSlots) {
-    this(windowSlots, new SingleThreadContext("sliding-window-counter-%d"));
-  }
-
-  /**
-   * Creates a new sliding window counter with the given total number of
-   * window slots.
-   *
-   * @param windowSlots total number of window slots
-   */
-  public SlidingWindowCounter(int windowSlots, ThreadContext context) {
-    checkArgument(windowSlots > 0, "Window size must be a positive integer");
-
-    this.windowSlots = windowSlots;
-    this.headSlot = 0;
-
-    // Initialize each item in the list to an AtomicLong of 0
-    this.counters = Collections.nCopies(windowSlots, 0)
-        .stream()
-        .map(AtomicLong::new)
-        .collect(Collectors.toCollection(ArrayList::new));
-    this.schedule = context.schedule(0, SLIDE_WINDOW_PERIOD_SECONDS, TimeUnit.SECONDS, this::advanceHead);
-  }
-
-  /**
-   * Releases resources used by the SlidingWindowCounter.
-   */
-  public void destroy() {
-    schedule.cancel();
-  }
-
-  /**
-   * Increments the count of the current window slot by 1.
-   */
-  public void incrementCount() {
-    incrementCount(headSlot, 1);
-  }
-
-  /**
-   * Increments the count of the current window slot by the given value.
-   *
-   * @param value value to increment by
-   */
-  public void incrementCount(long value) {
-    incrementCount(headSlot, value);
-  }
-
-  private void incrementCount(int slot, long value) {
-    counters.get(slot).addAndGet(value);
-  }
-
-  /**
-   * Gets the total count for the last N window slots.
-   *
-   * @param slots number of slots to include in the count
-   * @return total count for last N slots
-   */
-  public long get(int slots) {
-    checkArgument(slots <= windowSlots,
-        "Requested window must be less than the total window slots");
-
-    long sum = 0;
-
-    for (int i = 0; i < slots; i++) {
-      int currentIndex = headSlot - i;
-      if (currentIndex < 0) {
-        currentIndex = counters.size() + currentIndex;
-      }
-      sum += counters.get(currentIndex).get();
-    }
-
-    return sum;
-  }
-
-  void advanceHead() {
-    counters.get(slotAfter(headSlot)).set(0);
-    headSlot = slotAfter(headSlot);
-  }
-
-  private int slotAfter(int slot) {
-    return (slot + 1) % windowSlots;
-  }
-
-}