Merge "OPNFLWPLUG-929 : Remove deprecated guava library"
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / statistics / StatisticsContextImpl.java
index d2b909f1979f125c7c17652794fa4a7791d4b0e2..61fd206c2fbe4f0ea7474cadfe3bdb6b10f5aac1 100644 (file)
@@ -15,6 +15,7 @@ import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
@@ -65,8 +66,8 @@ class StatisticsContextImpl<T extends OfHeader> implements StatisticsContext {
     private final long maximumPollingDelay;
     private final boolean isUsingReconciliationFramework;
     private final AtomicBoolean schedulingEnabled = new AtomicBoolean(true);
-    private final AtomicReference<ListenableFuture<Boolean>> lastDataGathering = new AtomicReference<>();
-    private final AtomicReference<StatisticsPollingService> statisticsPollingService = new AtomicReference<>();
+    private final AtomicReference<ListenableFuture<Boolean>> lastDataGatheringRef = new AtomicReference<>();
+    private final AtomicReference<StatisticsPollingService> statisticsPollingServiceRef = new AtomicReference<>();
     private List<MultipartType> collectingStatType;
     private StatisticsGatheringService<T> statisticsGatheringService;
     private StatisticsGatheringOnTheFlyService<T> statisticsGatheringOnTheFlyService;
@@ -75,10 +76,8 @@ class StatisticsContextImpl<T extends OfHeader> implements StatisticsContext {
     StatisticsContextImpl(@Nonnull final DeviceContext deviceContext,
                           @Nonnull final ConvertorExecutor convertorExecutor,
                           @Nonnull final MultipartWriterProvider statisticsWriterProvider,
-                          @Nonnull final ListeningExecutorService executorService,
-                          boolean isStatisticsPollingOn,
-                          boolean isUsingReconciliationFramework,
-                          long statisticsPollingInterval,
+                          @Nonnull final ListeningExecutorService executorService, boolean isStatisticsPollingOn,
+                          boolean isUsingReconciliationFramework, long statisticsPollingInterval,
                           long maximumPollingDelay) {
         this.deviceContext = deviceContext;
         this.devState = Preconditions.checkNotNull(deviceContext.getDeviceState());
@@ -92,8 +91,9 @@ class StatisticsContextImpl<T extends OfHeader> implements StatisticsContext {
         this.isUsingReconciliationFramework = isUsingReconciliationFramework;
 
         statisticsGatheringService = new StatisticsGatheringService<>(this, deviceContext);
-        statisticsGatheringOnTheFlyService = new StatisticsGatheringOnTheFlyService<>(this,
-                deviceContext, convertorExecutor, statisticsWriterProvider);
+        statisticsGatheringOnTheFlyService = new StatisticsGatheringOnTheFlyService<>(this, deviceContext,
+                                                                                      convertorExecutor,
+                                                                                      statisticsWriterProvider);
     }
 
     @Override
@@ -108,8 +108,8 @@ class StatisticsContextImpl<T extends OfHeader> implements StatisticsContext {
     }
 
     @Override
-    public void registerMastershipWatcher(@Nonnull final ContextChainMastershipWatcher contextChainMastershipWatcher) {
-        this.contextChainMastershipWatcher = contextChainMastershipWatcher;
+    public void registerMastershipWatcher(@Nonnull final ContextChainMastershipWatcher newWatcher) {
+        this.contextChainMastershipWatcher = newWatcher;
     }
 
     @Override
@@ -138,15 +138,12 @@ class StatisticsContextImpl<T extends OfHeader> implements StatisticsContext {
     @Override
     public void continueInitializationAfterReconciliation() {
         if (deviceContext.initialSubmitTransaction()) {
-            contextChainMastershipWatcher.onMasterRoleAcquired(
-                    deviceInfo,
-                    ContextChainMastershipState.INITIAL_SUBMIT);
+            contextChainMastershipWatcher.onMasterRoleAcquired(deviceInfo, ContextChainMastershipState.INITIAL_SUBMIT);
 
             startGatheringData();
         } else {
-            contextChainMastershipWatcher.onNotAbleToStartMastershipMandatory(
-                    deviceInfo,
-                    "Initial transaction cannot be submitted.");
+            contextChainMastershipWatcher
+                    .onNotAbleToStartMastershipMandatory(deviceInfo, "Initial transaction cannot be submitted.");
         }
     }
 
@@ -181,7 +178,7 @@ class StatisticsContextImpl<T extends OfHeader> implements StatisticsContext {
         }
 
         collectingStatType = ImmutableList.copyOf(statListForCollecting);
-        Futures.addCallback(gatherDynamicData(), new InitialSubmitCallback());
+        Futures.addCallback(gatherDynamicData(), new InitialSubmitCallback(), MoreExecutors.directExecutor());
     }
 
     @Override
@@ -191,7 +188,7 @@ class StatisticsContextImpl<T extends OfHeader> implements StatisticsContext {
 
     @Override
     public void close() {
-         Futures.addCallback(stopGatheringData(), new FutureCallback<Void>() {
+        Futures.addCallback(stopGatheringData(), new FutureCallback<Void>() {
             @Override
             public void onSuccess(@Nullable final Void result) {
                 requestContexts.forEach(requestContext -> RequestContextUtil
@@ -199,11 +196,11 @@ class StatisticsContextImpl<T extends OfHeader> implements StatisticsContext {
             }
 
             @Override
-            public void onFailure(final Throwable t) {
+            public void onFailure(final Throwable throwable) {
                 requestContexts.forEach(requestContext -> RequestContextUtil
                         .closeRequestContextWithRpcError(requestContext, CONNECTION_CLOSED));
             }
-        });
+        }, MoreExecutors.directExecutor());
     }
 
     private ListenableFuture<Boolean> gatherDynamicData() {
@@ -212,22 +209,20 @@ class StatisticsContextImpl<T extends OfHeader> implements StatisticsContext {
             return Futures.immediateFuture(Boolean.TRUE);
         }
 
-        return this.lastDataGathering.updateAndGet(future -> {
+        return this.lastDataGatheringRef.updateAndGet(future -> {
             // write start timestamp to state snapshot container
             StatisticsGatheringUtils.markDeviceStateSnapshotStart(deviceInfo, deviceContext);
 
             // recreate gathering future if it should be recreated
-            final ListenableFuture<Boolean> lastDataGathering = Objects.isNull(future) ||
-                    future.isCancelled() ||
-                    future.isDone() ?
-                    Futures.immediateFuture(Boolean.TRUE) :
-                    future;
+            final ListenableFuture<Boolean> lastDataGathering =
+                    Objects.isNull(future) || future.isCancelled() || future.isDone() ? Futures
+                            .immediateFuture(Boolean.TRUE) : future;
 
             // build statistics gathering future
-            final ListenableFuture<Boolean> newDataGathering = collectingStatType.stream().reduce(
-                    lastDataGathering,
-                    this::statChainFuture,
-                    (a, b) -> Futures.transformAsync(a, result -> b));
+            final ListenableFuture<Boolean> newDataGathering = collectingStatType.stream()
+                    .reduce(lastDataGathering, this::statChainFuture,
+                        (listenableFuture, asyn) -> Futures.transformAsync(listenableFuture, result -> asyn,
+                                MoreExecutors.directExecutor()));
 
             // write end timestamp to state snapshot container
             Futures.addCallback(newDataGathering, new FutureCallback<Boolean>() {
@@ -237,19 +232,21 @@ class StatisticsContextImpl<T extends OfHeader> implements StatisticsContext {
                 }
 
                 @Override
-                public void onFailure(final Throwable t) {
-                    if (!(t instanceof TransactionChainClosedException)) {
+                public void onFailure(final Throwable throwable) {
+                    if (!(throwable instanceof TransactionChainClosedException)) {
                         StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceInfo, deviceContext, false);
                     }
                 }
-            });
+            }, MoreExecutors.directExecutor());
 
             return newDataGathering;
         });
     }
 
-    private ListenableFuture<Boolean> statChainFuture(final ListenableFuture<Boolean> prevFuture, final MultipartType multipartType) {
-        if (ConnectionContext.CONNECTION_STATE.RIP.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) {
+    private ListenableFuture<Boolean> statChainFuture(final ListenableFuture<Boolean> prevFuture,
+                                                      final MultipartType multipartType) {
+        if (ConnectionContext.CONNECTION_STATE.RIP
+                .equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) {
             final String errMsg = String
                     .format("Device connection for node %s doesn't exist anymore. Primary connection status : %s",
                             getDeviceInfo().getNodeId(),
@@ -265,16 +262,12 @@ class StatisticsContextImpl<T extends OfHeader> implements StatisticsContext {
             final boolean supported = collectingStatType.contains(multipartType);
 
             // TODO: Refactor twice sending deviceContext into gatheringStatistics
-            return supported ? StatisticsGatheringUtils.gatherStatistics(
-                    onTheFly ? statisticsGatheringOnTheFlyService : statisticsGatheringService,
-                    getDeviceInfo(),
-                    multipartType,
-                    deviceContext,
-                    deviceContext,
-                    convertorExecutor,
-                    statisticsWriterProvider,
-                    executorService) : Futures.immediateFuture(Boolean.FALSE);
-        });
+            return supported ? StatisticsGatheringUtils
+                    .gatherStatistics(onTheFly ? statisticsGatheringOnTheFlyService : statisticsGatheringService,
+                                      getDeviceInfo(), multipartType, deviceContext, deviceContext, convertorExecutor,
+                                      statisticsWriterProvider, executorService) : Futures
+                    .immediateFuture(Boolean.FALSE);
+        }, MoreExecutors.directExecutor());
     }
 
     private void startGatheringData() {
@@ -283,27 +276,27 @@ class StatisticsContextImpl<T extends OfHeader> implements StatisticsContext {
         }
 
         LOG.info("Starting statistics gathering for node {}", deviceInfo);
-        final StatisticsPollingService statisticsPollingService = new StatisticsPollingService(timeCounter,
-                statisticsPollingInterval, maximumPollingDelay,
-                StatisticsContextImpl.this::gatherDynamicData);
+        final StatisticsPollingService statisticsPollingService =
+                new StatisticsPollingService(timeCounter,
+                                             statisticsPollingInterval,
+                                             maximumPollingDelay,
+                                             StatisticsContextImpl.this::gatherDynamicData);
 
         schedulingEnabled.set(true);
         statisticsPollingService.startAsync();
-        this.statisticsPollingService.set(statisticsPollingService);
+        this.statisticsPollingServiceRef.set(statisticsPollingService);
     }
 
     private ListenableFuture<Void> stopGatheringData() {
         LOG.info("Stopping running statistics gathering for node {}", deviceInfo);
         cancelLastDataGathering();
 
-        return Optional
-                .ofNullable(statisticsPollingService.getAndSet(null))
-                .map(StatisticsPollingService::stop)
+        return Optional.ofNullable(statisticsPollingServiceRef.getAndSet(null)).map(StatisticsPollingService::stop)
                 .orElseGet(() -> Futures.immediateFuture(null));
     }
 
     private void cancelLastDataGathering() {
-        final ListenableFuture<Boolean> future = lastDataGathering.getAndSet(null);
+        final ListenableFuture<Boolean> future = lastDataGatheringRef.getAndSet(null);
 
         if (Objects.nonNull(future) && !future.isDone() && !future.isCancelled()) {
             future.cancel(true);
@@ -316,17 +309,16 @@ class StatisticsContextImpl<T extends OfHeader> implements StatisticsContext {
     }
 
     @VisibleForTesting
-    void setStatisticsGatheringOnTheFlyService(final StatisticsGatheringOnTheFlyService<T> statisticsGatheringOnTheFlyService) {
+    void setStatisticsGatheringOnTheFlyService(
+            final StatisticsGatheringOnTheFlyService<T> statisticsGatheringOnTheFlyService) {
         this.statisticsGatheringOnTheFlyService = statisticsGatheringOnTheFlyService;
     }
 
     private final class InitialSubmitCallback implements FutureCallback<Boolean> {
         @Override
         public void onSuccess(@Nullable final Boolean result) {
-            contextChainMastershipWatcher.onMasterRoleAcquired(
-                    deviceInfo,
-                    ContextChainMastershipState.INITIAL_GATHERING
-            );
+            contextChainMastershipWatcher
+                    .onMasterRoleAcquired(deviceInfo, ContextChainMastershipState.INITIAL_GATHERING);
 
             if (!isUsingReconciliationFramework) {
                 continueInitializationAfterReconciliation();
@@ -334,10 +326,11 @@ class StatisticsContextImpl<T extends OfHeader> implements StatisticsContext {
         }
 
         @Override
-        public void onFailure(@Nonnull final Throwable t) {
-            contextChainMastershipWatcher.onNotAbleToStartMastershipMandatory(
-                    deviceInfo,
-                    "Initial gathering statistics unsuccessful: " + t.getMessage());
+        public void onFailure(@Nonnull final Throwable throwable) {
+            contextChainMastershipWatcher.onNotAbleToStartMastershipMandatory(deviceInfo,
+                                                                              "Initial gathering statistics "
+                                                                                      + "unsuccessful: "
+                                                                                      + throwable.getMessage());
         }
     }
-}
\ No newline at end of file
+}