private final long maximumPollingDelay;
private final boolean isUsingReconciliationFramework;
private final AtomicBoolean schedulingEnabled = new AtomicBoolean(true);
- private final AtomicReference<ListenableFuture<Boolean>> lastDataGathering = new AtomicReference<>();
- private final AtomicReference<StatisticsPollingService> statisticsPollingService = new AtomicReference<>();
+ private final AtomicReference<ListenableFuture<Boolean>> lastDataGatheringRef = new AtomicReference<>();
+ private final AtomicReference<StatisticsPollingService> statisticsPollingServiceRef = new AtomicReference<>();
private List<MultipartType> collectingStatType;
private StatisticsGatheringService<T> statisticsGatheringService;
private StatisticsGatheringOnTheFlyService<T> statisticsGatheringOnTheFlyService;
}
@Override
- public void registerMastershipWatcher(@Nonnull final ContextChainMastershipWatcher contextChainMastershipWatcher) {
- this.contextChainMastershipWatcher = contextChainMastershipWatcher;
+ public void registerMastershipWatcher(@Nonnull final ContextChainMastershipWatcher newWatcher) {
+ this.contextChainMastershipWatcher = newWatcher;
}
@Override
statListForCollecting.add(MultipartType.OFPMPTABLE);
}
- if (devState.isFlowStatisticsAvailable()) {
- statListForCollecting.add(MultipartType.OFPMPFLOW);
- }
-
if (devState.isGroupAvailable()) {
statListForCollecting.add(MultipartType.OFPMPGROUPDESC);
statListForCollecting.add(MultipartType.OFPMPGROUP);
statListForCollecting.add(MultipartType.OFPMPMETER);
}
+ if (devState.isFlowStatisticsAvailable()) {
+ statListForCollecting.add(MultipartType.OFPMPFLOW);
+ }
+
if (devState.isPortStatisticsAvailable()) {
statListForCollecting.add(MultipartType.OFPMPPORTSTATS);
}
return Futures.immediateFuture(Boolean.TRUE);
}
- return this.lastDataGathering.updateAndGet(future -> {
+ return this.lastDataGatheringRef.updateAndGet(future -> {
// write start timestamp to state snapshot container
StatisticsGatheringUtils.markDeviceStateSnapshotStart(deviceInfo, deviceContext);
// build statistics gathering future
final ListenableFuture<Boolean> newDataGathering = collectingStatType.stream()
.reduce(lastDataGathering, this::statChainFuture,
- (listenableFuture, asyn) -> Futures.transformAsync(listenableFuture, result -> asyn));
+ (listenableFuture, asyn) -> Futures.transformAsync(listenableFuture, result -> asyn,
+ MoreExecutors.directExecutor()));
// write end timestamp to state snapshot container
Futures.addCallback(newDataGathering, new FutureCallback<Boolean>() {
getDeviceInfo(), multipartType, deviceContext, deviceContext, convertorExecutor,
statisticsWriterProvider, executorService) : Futures
.immediateFuture(Boolean.FALSE);
- });
+ }, MoreExecutors.directExecutor());
}
private void startGatheringData() {
schedulingEnabled.set(true);
statisticsPollingService.startAsync();
- this.statisticsPollingService.set(statisticsPollingService);
+ this.statisticsPollingServiceRef.set(statisticsPollingService);
}
private ListenableFuture<Void> stopGatheringData() {
LOG.info("Stopping running statistics gathering for node {}", deviceInfo);
cancelLastDataGathering();
- return Optional.ofNullable(statisticsPollingService.getAndSet(null)).map(StatisticsPollingService::stop)
+ return Optional.ofNullable(statisticsPollingServiceRef.getAndSet(null)).map(StatisticsPollingService::stop)
.orElseGet(() -> Futures.immediateFuture(null));
}
private void cancelLastDataGathering() {
- final ListenableFuture<Boolean> future = lastDataGathering.getAndSet(null);
+ final ListenableFuture<Boolean> future = lastDataGatheringRef.getAndSet(null);
if (Objects.nonNull(future) && !future.isDone() && !future.isCancelled()) {
future.cancel(true);