static final long STARTUP_LOOP_TICK = 500L;
static final int STARTUP_LOOP_MAX_RETRIES = 8;
- static private final int PROCESSING_THREAD_NUMBER = 8;
- private final ExecutorService[] executorServiceLot;
+ private final ExecutorService executorService = Executors.newFixedThreadPool(8);
+
private static final AtomicIntegerFieldUpdater<AbstractDropTest> SENT_UPDATER = AtomicIntegerFieldUpdater.newUpdater(AbstractDropTest.class, "sent");
private volatile int sent;
private static final AtomicIntegerFieldUpdater<AbstractDropTest> EXCS_UPDATER = AtomicIntegerFieldUpdater.newUpdater(AbstractDropTest.class, "excs");
private volatile int excs;
- public AbstractDropTest() {
- executorServiceLot = new ExecutorService[PROCESSING_THREAD_NUMBER];
- for (int i = 0; i < PROCESSING_THREAD_NUMBER; i++) {
- executorServiceLot[i] = Executors.newFixedThreadPool(1);
- }
- }
-
public final DropTestStats getStats() {
return new DropTestStats(this.sent, this.rcvd, this.excs);
}
RCVD_UPDATER.incrementAndGet(this);
- NodeKey nodeKey = notification.getIngress().getValue().firstKeyOf(Node.class, NodeKey.class);
- int nodeIdHash = nodeKey.getId().getValue().hashCode();
-
- executorServiceLot[nodeIdHash % PROCESSING_THREAD_NUMBER].submit(new Runnable() {
+ executorService.submit(new Runnable() {
@Override
public void run() {
@Override
public void close() {
- for (ExecutorService service : executorServiceLot) {
- service.shutdown();
- }
+ executorService.shutdown();
}
}