import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
private final long maximumPollingDelay;
private final boolean isUsingReconciliationFramework;
private final AtomicBoolean schedulingEnabled = new AtomicBoolean(true);
- private final AtomicReference<ListenableFuture<Boolean>> lastDataGathering = new AtomicReference<>();
- private final AtomicReference<StatisticsPollingService> statisticsPollingService = new AtomicReference<>();
+ private final AtomicReference<ListenableFuture<Boolean>> lastDataGatheringRef = new AtomicReference<>();
+ private final AtomicReference<StatisticsPollingService> statisticsPollingServiceRef = new AtomicReference<>();
private List<MultipartType> collectingStatType;
private StatisticsGatheringService<T> statisticsGatheringService;
private StatisticsGatheringOnTheFlyService<T> statisticsGatheringOnTheFlyService;
StatisticsContextImpl(@Nonnull final DeviceContext deviceContext,
@Nonnull final ConvertorExecutor convertorExecutor,
@Nonnull final MultipartWriterProvider statisticsWriterProvider,
- @Nonnull final ListeningExecutorService executorService,
- boolean isStatisticsPollingOn,
- boolean isUsingReconciliationFramework,
- long statisticsPollingInterval,
+ @Nonnull final ListeningExecutorService executorService, boolean isStatisticsPollingOn,
+ boolean isUsingReconciliationFramework, long statisticsPollingInterval,
long maximumPollingDelay) {
this.deviceContext = deviceContext;
this.devState = Preconditions.checkNotNull(deviceContext.getDeviceState());
this.isUsingReconciliationFramework = isUsingReconciliationFramework;
statisticsGatheringService = new StatisticsGatheringService<>(this, deviceContext);
- statisticsGatheringOnTheFlyService = new StatisticsGatheringOnTheFlyService<>(this,
- deviceContext, convertorExecutor, statisticsWriterProvider);
+ statisticsGatheringOnTheFlyService = new StatisticsGatheringOnTheFlyService<>(this, deviceContext,
+ convertorExecutor,
+ statisticsWriterProvider);
}
@Override
}
@Override
- public void registerMastershipWatcher(@Nonnull final ContextChainMastershipWatcher contextChainMastershipWatcher) {
- this.contextChainMastershipWatcher = contextChainMastershipWatcher;
+ public void registerMastershipWatcher(@Nonnull final ContextChainMastershipWatcher newWatcher) {
+ this.contextChainMastershipWatcher = newWatcher;
}
@Override
@Override
public void continueInitializationAfterReconciliation() {
if (deviceContext.initialSubmitTransaction()) {
- contextChainMastershipWatcher.onMasterRoleAcquired(
- deviceInfo,
- ContextChainMastershipState.INITIAL_SUBMIT);
+ contextChainMastershipWatcher.onMasterRoleAcquired(deviceInfo, ContextChainMastershipState.INITIAL_SUBMIT);
startGatheringData();
} else {
- contextChainMastershipWatcher.onNotAbleToStartMastershipMandatory(
- deviceInfo,
- "Initial transaction cannot be submitted.");
+ contextChainMastershipWatcher
+ .onNotAbleToStartMastershipMandatory(deviceInfo, "Initial transaction cannot be submitted.");
}
}
}
collectingStatType = ImmutableList.copyOf(statListForCollecting);
- Futures.addCallback(gatherDynamicData(), new InitialSubmitCallback());
+ Futures.addCallback(gatherDynamicData(), new InitialSubmitCallback(), MoreExecutors.directExecutor());
}
@Override
@Override
public void close() {
- Futures.addCallback(stopGatheringData(), new FutureCallback<Void>() {
+ Futures.addCallback(stopGatheringData(), new FutureCallback<Void>() {
@Override
public void onSuccess(@Nullable final Void result) {
requestContexts.forEach(requestContext -> RequestContextUtil
}
@Override
- public void onFailure(final Throwable t) {
+ public void onFailure(final Throwable throwable) {
requestContexts.forEach(requestContext -> RequestContextUtil
.closeRequestContextWithRpcError(requestContext, CONNECTION_CLOSED));
}
- });
+ }, MoreExecutors.directExecutor());
}
private ListenableFuture<Boolean> gatherDynamicData() {
return Futures.immediateFuture(Boolean.TRUE);
}
- return this.lastDataGathering.updateAndGet(future -> {
+ return this.lastDataGatheringRef.updateAndGet(future -> {
// write start timestamp to state snapshot container
StatisticsGatheringUtils.markDeviceStateSnapshotStart(deviceInfo, deviceContext);
// recreate gathering future if it should be recreated
- final ListenableFuture<Boolean> lastDataGathering = Objects.isNull(future) ||
- future.isCancelled() ||
- future.isDone() ?
- Futures.immediateFuture(Boolean.TRUE) :
- future;
+ final ListenableFuture<Boolean> lastDataGathering =
+ Objects.isNull(future) || future.isCancelled() || future.isDone() ? Futures
+ .immediateFuture(Boolean.TRUE) : future;
// build statistics gathering future
- final ListenableFuture<Boolean> newDataGathering = collectingStatType.stream().reduce(
- lastDataGathering,
- this::statChainFuture,
- (a, b) -> Futures.transformAsync(a, result -> b));
+ final ListenableFuture<Boolean> newDataGathering = collectingStatType.stream()
+ .reduce(lastDataGathering, this::statChainFuture,
+ (listenableFuture, asyn) -> Futures.transformAsync(listenableFuture, result -> asyn,
+ MoreExecutors.directExecutor()));
// write end timestamp to state snapshot container
Futures.addCallback(newDataGathering, new FutureCallback<Boolean>() {
}
@Override
- public void onFailure(final Throwable t) {
- if (!(t instanceof TransactionChainClosedException)) {
+ public void onFailure(final Throwable throwable) {
+ if (!(throwable instanceof TransactionChainClosedException)) {
StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceInfo, deviceContext, false);
}
}
- });
+ }, MoreExecutors.directExecutor());
return newDataGathering;
});
}
- private ListenableFuture<Boolean> statChainFuture(final ListenableFuture<Boolean> prevFuture, final MultipartType multipartType) {
- if (ConnectionContext.CONNECTION_STATE.RIP.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) {
+ private ListenableFuture<Boolean> statChainFuture(final ListenableFuture<Boolean> prevFuture,
+ final MultipartType multipartType) {
+ if (ConnectionContext.CONNECTION_STATE.RIP
+ .equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) {
final String errMsg = String
.format("Device connection for node %s doesn't exist anymore. Primary connection status : %s",
getDeviceInfo().getNodeId(),
final boolean supported = collectingStatType.contains(multipartType);
// TODO: Refactor twice sending deviceContext into gatheringStatistics
- return supported ? StatisticsGatheringUtils.gatherStatistics(
- onTheFly ? statisticsGatheringOnTheFlyService : statisticsGatheringService,
- getDeviceInfo(),
- multipartType,
- deviceContext,
- deviceContext,
- convertorExecutor,
- statisticsWriterProvider,
- executorService) : Futures.immediateFuture(Boolean.FALSE);
- });
+ return supported ? StatisticsGatheringUtils
+ .gatherStatistics(onTheFly ? statisticsGatheringOnTheFlyService : statisticsGatheringService,
+ getDeviceInfo(), multipartType, deviceContext, deviceContext, convertorExecutor,
+ statisticsWriterProvider, executorService) : Futures
+ .immediateFuture(Boolean.FALSE);
+ }, MoreExecutors.directExecutor());
}
private void startGatheringData() {
}
LOG.info("Starting statistics gathering for node {}", deviceInfo);
- final StatisticsPollingService statisticsPollingService = new StatisticsPollingService(timeCounter,
- statisticsPollingInterval, maximumPollingDelay,
- StatisticsContextImpl.this::gatherDynamicData);
+ final StatisticsPollingService statisticsPollingService =
+ new StatisticsPollingService(timeCounter,
+ statisticsPollingInterval,
+ maximumPollingDelay,
+ StatisticsContextImpl.this::gatherDynamicData);
schedulingEnabled.set(true);
statisticsPollingService.startAsync();
- this.statisticsPollingService.set(statisticsPollingService);
+ this.statisticsPollingServiceRef.set(statisticsPollingService);
}
private ListenableFuture<Void> stopGatheringData() {
LOG.info("Stopping running statistics gathering for node {}", deviceInfo);
cancelLastDataGathering();
- return Optional
- .ofNullable(statisticsPollingService.getAndSet(null))
- .map(StatisticsPollingService::stop)
+ return Optional.ofNullable(statisticsPollingServiceRef.getAndSet(null)).map(StatisticsPollingService::stop)
.orElseGet(() -> Futures.immediateFuture(null));
}
private void cancelLastDataGathering() {
- final ListenableFuture<Boolean> future = lastDataGathering.getAndSet(null);
+ final ListenableFuture<Boolean> future = lastDataGatheringRef.getAndSet(null);
if (Objects.nonNull(future) && !future.isDone() && !future.isCancelled()) {
future.cancel(true);
}
@VisibleForTesting
- void setStatisticsGatheringOnTheFlyService(final StatisticsGatheringOnTheFlyService<T> statisticsGatheringOnTheFlyService) {
+ void setStatisticsGatheringOnTheFlyService(
+ final StatisticsGatheringOnTheFlyService<T> statisticsGatheringOnTheFlyService) {
this.statisticsGatheringOnTheFlyService = statisticsGatheringOnTheFlyService;
}
private final class InitialSubmitCallback implements FutureCallback<Boolean> {
@Override
public void onSuccess(@Nullable final Boolean result) {
- contextChainMastershipWatcher.onMasterRoleAcquired(
- deviceInfo,
- ContextChainMastershipState.INITIAL_GATHERING
- );
+ contextChainMastershipWatcher
+ .onMasterRoleAcquired(deviceInfo, ContextChainMastershipState.INITIAL_GATHERING);
if (!isUsingReconciliationFramework) {
continueInitializationAfterReconciliation();
}
@Override
- public void onFailure(@Nonnull final Throwable t) {
- contextChainMastershipWatcher.onNotAbleToStartMastershipMandatory(
- deviceInfo,
- "Initial gathering statistics unsuccessful: " + t.getMessage());
+ public void onFailure(@Nonnull final Throwable throwable) {
+ contextChainMastershipWatcher.onNotAbleToStartMastershipMandatory(deviceInfo,
+ "Initial gathering statistics "
+ + "unsuccessful: "
+ + throwable.getMessage());
}
}
-}
\ No newline at end of file
+}