Import atomix/{storage,utils}
[controller.git] / third-party / atomix / utils / src / main / java / io / atomix / utils / concurrent / AbstractAccumulator.java
1 /*
2  * Copyright 2015-present Open Networking Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.utils.concurrent;
17
18 import com.google.common.collect.ImmutableList;
19 import com.google.common.collect.Lists;
20 import org.slf4j.Logger;
21 import org.slf4j.LoggerFactory;
22
23 import java.util.List;
24 import java.util.Timer;
25 import java.util.TimerTask;
26 import java.util.concurrent.atomic.AtomicReference;
27
28 import static com.google.common.base.Preconditions.checkArgument;
29 import static com.google.common.base.Preconditions.checkNotNull;
30
31 /**
32  * Base implementation of an item accumulator. It allows triggering based on
33  * item inter-arrival time threshold, maximum batch life threshold and maximum
34  * batch size.
35  */
36 public abstract class AbstractAccumulator<T> implements Accumulator<T> {
37
38   private Logger log = LoggerFactory.getLogger(AbstractAccumulator.class);
39
40   private final Timer timer;
41   private final int maxItems;
42   private final int maxBatchMillis;
43   private final int maxIdleMillis;
44
45   private final AtomicReference<TimerTask> idleTask = new AtomicReference<>();
46   private final AtomicReference<TimerTask> maxTask = new AtomicReference<>();
47
48   private final List<T> items;
49
50   /**
51    * Creates an item accumulator capable of triggering on the specified
52    * thresholds.
53    *
54    * @param timer          timer to use for scheduling check-points
55    * @param maxItems       maximum number of items to accumulate before
56    *                       processing is triggered
57    *                       <p>
58    *                       NB: It is possible that processItems will contain
59    *                       more than maxItems under high load or if isReady()
60    *                       can return false.
61    *                       </p>
62    * @param maxBatchMillis maximum number of millis allowed since the first
63    *                       item before processing is triggered
64    * @param maxIdleMillis  maximum number millis between items before
65    *                       processing is triggered
66    */
67   protected AbstractAccumulator(Timer timer, int maxItems,
68                                 int maxBatchMillis, int maxIdleMillis) {
69     this.timer = checkNotNull(timer, "Timer cannot be null");
70
71     checkArgument(maxItems > 1, "Maximum number of items must be > 1");
72     checkArgument(maxBatchMillis > 0, "Maximum millis must be positive");
73     checkArgument(maxIdleMillis > 0, "Maximum idle millis must be positive");
74
75     this.maxItems = maxItems;
76     this.maxBatchMillis = maxBatchMillis;
77     this.maxIdleMillis = maxIdleMillis;
78
79     items = Lists.newArrayListWithExpectedSize(maxItems);
80   }
81
82   @Override
83   public void add(T item) {
84     final int sizeAtTimeOfAdd;
85     synchronized (items) {
86       items.add(item);
87       sizeAtTimeOfAdd = items.size();
88     }
89
90         /*
91             WARNING: It is possible that the item that was just added to the list
92             has been processed by an existing idle task at this point.
93
94             By rescheduling the following timers, it is possible that a
95             superfluous maxTask is generated now OR that the idle task and max
96             task are scheduled at their specified delays. This could result in
97             calls to processItems sooner than expected.
98          */
99
100     // Did we hit the max item threshold?
101     if (sizeAtTimeOfAdd >= maxItems) {
102       if (maxIdleMillis < maxBatchMillis) {
103         cancelTask(idleTask);
104       }
105       rescheduleTask(maxTask, 0 /* now! */);
106     } else {
107       // Otherwise, schedule idle task and if this is a first item
108       // also schedule the max batch age task.
109       if (maxIdleMillis < maxBatchMillis) {
110         rescheduleTask(idleTask, maxIdleMillis);
111       }
112       if (sizeAtTimeOfAdd == 1) {
113         rescheduleTask(maxTask, maxBatchMillis);
114       }
115     }
116   }
117
118   /**
119    * Reschedules the specified task, cancelling existing one if applicable.
120    *
121    * @param taskRef task reference
122    * @param millis  delay in milliseconds
123    */
124   private void rescheduleTask(AtomicReference<TimerTask> taskRef, long millis) {
125     ProcessorTask newTask = new ProcessorTask();
126     timer.schedule(newTask, millis);
127     swapAndCancelTask(taskRef, newTask);
128   }
129
130   /**
131    * Cancels the specified task if it has not run or is not running.
132    *
133    * @param taskRef task reference
134    */
135   private void cancelTask(AtomicReference<TimerTask> taskRef) {
136     swapAndCancelTask(taskRef, null);
137   }
138
139   /**
140    * Sets the new task and attempts to cancelTask the old one.
141    *
142    * @param taskRef task reference
143    * @param newTask new task
144    */
145   private void swapAndCancelTask(AtomicReference<TimerTask> taskRef,
146                                  TimerTask newTask) {
147     TimerTask oldTask = taskRef.getAndSet(newTask);
148     if (oldTask != null) {
149       oldTask.cancel();
150     }
151   }
152
153   // Task for triggering processing of accumulated items
154   private class ProcessorTask extends TimerTask {
155     @Override
156     public void run() {
157       try {
158         if (isReady()) {
159
160           List<T> batch = finalizeCurrentBatch();
161           if (!batch.isEmpty()) {
162             processItems(batch);
163           }
164         } else {
165           rescheduleTask(idleTask, maxIdleMillis);
166         }
167       } catch (Exception e) {
168         log.warn("Unable to process batch due to", e);
169       }
170     }
171   }
172
173   /**
174    * Returns an immutable copy of the existing items and clear the list.
175    *
176    * @return list of existing items
177    */
178   private List<T> finalizeCurrentBatch() {
179     List<T> finalizedList;
180     synchronized (items) {
181       finalizedList = ImmutableList.copyOf(items);
182       items.clear();
183             /*
184              * To avoid reprocessing being triggered on an empty list.
185              */
186       cancelTask(maxTask);
187       cancelTask(idleTask);
188     }
189     return finalizedList;
190   }
191
192   @Override
193   public boolean isReady() {
194     return true;
195   }
196
197   /**
198    * Returns the backing timer.
199    *
200    * @return backing timer
201    */
202   public Timer timer() {
203     return timer;
204   }
205
206   /**
207    * Returns the maximum number of items allowed to accumulate before
208    * processing is triggered.
209    *
210    * @return max number of items
211    */
212   public int maxItems() {
213     return maxItems;
214   }
215
216   /**
217    * Returns the maximum number of millis allowed to expire since the first
218    * item before processing is triggered.
219    *
220    * @return max number of millis a batch is allowed to last
221    */
222   public int maxBatchMillis() {
223     return maxBatchMillis;
224   }
225
226   /**
227    * Returns the maximum number of millis allowed to expire since the last
228    * item arrival before processing is triggered.
229    *
230    * @return max number of millis since the last item
231    */
232   public int maxIdleMillis() {
233     return maxIdleMillis;
234   }
235
236 }