2 * Copyright 2015-present Open Networking Foundation
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package io.atomix.utils.concurrent;
18 import com.google.common.collect.ImmutableList;
19 import com.google.common.collect.Lists;
20 import org.slf4j.Logger;
21 import org.slf4j.LoggerFactory;
23 import java.util.List;
24 import java.util.Timer;
25 import java.util.TimerTask;
26 import java.util.concurrent.atomic.AtomicReference;
28 import static com.google.common.base.Preconditions.checkArgument;
29 import static com.google.common.base.Preconditions.checkNotNull;
32 * Base implementation of an item accumulator. It allows triggering based on
33 * item inter-arrival time threshold, maximum batch life threshold and maximum
36 public abstract class AbstractAccumulator<T> implements Accumulator<T> {
38 private Logger log = LoggerFactory.getLogger(AbstractAccumulator.class);
40 private final Timer timer;
41 private final int maxItems;
42 private final int maxBatchMillis;
43 private final int maxIdleMillis;
45 private final AtomicReference<TimerTask> idleTask = new AtomicReference<>();
46 private final AtomicReference<TimerTask> maxTask = new AtomicReference<>();
48 private final List<T> items;
51 * Creates an item accumulator capable of triggering on the specified
54 * @param timer timer to use for scheduling check-points
55 * @param maxItems maximum number of items to accumulate before
56 * processing is triggered
58 * NB: It is possible that processItems will contain
59 * more than maxItems under high load or if isReady()
62 * @param maxBatchMillis maximum number of millis allowed since the first
63 * item before processing is triggered
64 * @param maxIdleMillis maximum number millis between items before
65 * processing is triggered
67 protected AbstractAccumulator(Timer timer, int maxItems,
68 int maxBatchMillis, int maxIdleMillis) {
69 this.timer = checkNotNull(timer, "Timer cannot be null");
71 checkArgument(maxItems > 1, "Maximum number of items must be > 1");
72 checkArgument(maxBatchMillis > 0, "Maximum millis must be positive");
73 checkArgument(maxIdleMillis > 0, "Maximum idle millis must be positive");
75 this.maxItems = maxItems;
76 this.maxBatchMillis = maxBatchMillis;
77 this.maxIdleMillis = maxIdleMillis;
79 items = Lists.newArrayListWithExpectedSize(maxItems);
83 public void add(T item) {
84 final int sizeAtTimeOfAdd;
85 synchronized (items) {
87 sizeAtTimeOfAdd = items.size();
91 WARNING: It is possible that the item that was just added to the list
92 has been processed by an existing idle task at this point.
94 By rescheduling the following timers, it is possible that a
95 superfluous maxTask is generated now OR that the idle task and max
96 task are scheduled at their specified delays. This could result in
97 calls to processItems sooner than expected.
100 // Did we hit the max item threshold?
101 if (sizeAtTimeOfAdd >= maxItems) {
102 if (maxIdleMillis < maxBatchMillis) {
103 cancelTask(idleTask);
105 rescheduleTask(maxTask, 0 /* now! */);
107 // Otherwise, schedule idle task and if this is a first item
108 // also schedule the max batch age task.
109 if (maxIdleMillis < maxBatchMillis) {
110 rescheduleTask(idleTask, maxIdleMillis);
112 if (sizeAtTimeOfAdd == 1) {
113 rescheduleTask(maxTask, maxBatchMillis);
119 * Reschedules the specified task, cancelling existing one if applicable.
121 * @param taskRef task reference
122 * @param millis delay in milliseconds
124 private void rescheduleTask(AtomicReference<TimerTask> taskRef, long millis) {
125 ProcessorTask newTask = new ProcessorTask();
126 timer.schedule(newTask, millis);
127 swapAndCancelTask(taskRef, newTask);
131 * Cancels the specified task if it has not run or is not running.
133 * @param taskRef task reference
135 private void cancelTask(AtomicReference<TimerTask> taskRef) {
136 swapAndCancelTask(taskRef, null);
140 * Sets the new task and attempts to cancelTask the old one.
142 * @param taskRef task reference
143 * @param newTask new task
145 private void swapAndCancelTask(AtomicReference<TimerTask> taskRef,
147 TimerTask oldTask = taskRef.getAndSet(newTask);
148 if (oldTask != null) {
153 // Task for triggering processing of accumulated items
154 private class ProcessorTask extends TimerTask {
160 List<T> batch = finalizeCurrentBatch();
161 if (!batch.isEmpty()) {
165 rescheduleTask(idleTask, maxIdleMillis);
167 } catch (Exception e) {
168 log.warn("Unable to process batch due to", e);
174 * Returns an immutable copy of the existing items and clear the list.
176 * @return list of existing items
178 private List<T> finalizeCurrentBatch() {
179 List<T> finalizedList;
180 synchronized (items) {
181 finalizedList = ImmutableList.copyOf(items);
184 * To avoid reprocessing being triggered on an empty list.
187 cancelTask(idleTask);
189 return finalizedList;
193 public boolean isReady() {
198 * Returns the backing timer.
200 * @return backing timer
202 public Timer timer() {
207 * Returns the maximum number of items allowed to accumulate before
208 * processing is triggered.
210 * @return max number of items
212 public int maxItems() {
217 * Returns the maximum number of millis allowed to expire since the first
218 * item before processing is triggered.
220 * @return max number of millis a batch is allowed to last
222 public int maxBatchMillis() {
223 return maxBatchMillis;
227 * Returns the maximum number of millis allowed to expire since the last
228 * item arrival before processing is triggered.
230 * @return max number of millis since the last item
232 public int maxIdleMillis() {
233 return maxIdleMillis;