Bump mdsal to 5.0.2
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / statistics / StatisticsContextImpl.java
index 9b246c4a709784530f70b90b0ff9de3fc468fdf7..0fb57f9a11411c06ab542b97bb9d3f632eb73506 100644 (file)
@@ -5,7 +5,6 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.openflowplugin.impl.statistics;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -18,15 +17,13 @@ import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Objects;
 import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-import org.opendaylight.mdsal.common.api.TransactionChainClosedException;
+import org.opendaylight.mdsal.binding.api.TransactionChainClosedException;
 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
 import org.opendaylight.openflowplugin.api.ConnectionException;
 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
@@ -36,6 +33,7 @@ import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipWatcher;
+import org.opendaylight.openflowplugin.api.openflow.lifecycle.DeviceInitializationContext;
 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
 import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
@@ -45,15 +43,16 @@ import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.Statis
 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.provider.config.rev160510.OpenflowProviderConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-class StatisticsContextImpl<T extends OfHeader> implements StatisticsContext {
+class StatisticsContextImpl<T extends OfHeader> implements StatisticsContext, DeviceInitializationContext {
 
     private static final Logger LOG = LoggerFactory.getLogger(StatisticsContextImpl.class);
     private static final String CONNECTION_CLOSED = "Connection closed.";
 
-    private final Collection<RequestContext<?>> requestContexts = new HashSet<>();
+    private final Collection<RequestContext<?>> requestContexts = ConcurrentHashMap.newKeySet();
     private final DeviceContext deviceContext;
     private final DeviceState devState;
     private final ListeningExecutorService executorService;
@@ -62,12 +61,13 @@ class StatisticsContextImpl<T extends OfHeader> implements StatisticsContext {
     private final MultipartWriterProvider statisticsWriterProvider;
     private final DeviceInfo deviceInfo;
     private final TimeCounter timeCounter = new TimeCounter();
+    private final OpenflowProviderConfig config;
     private final long statisticsPollingInterval;
     private final long maximumPollingDelay;
     private final boolean isUsingReconciliationFramework;
     private final AtomicBoolean schedulingEnabled = new AtomicBoolean(true);
-    private final AtomicReference<ListenableFuture<Boolean>> lastDataGathering = new AtomicReference<>();
-    private final AtomicReference<StatisticsPollingService> statisticsPollingService = new AtomicReference<>();
+    private final AtomicReference<ListenableFuture<Boolean>> lastDataGatheringRef = new AtomicReference<>();
+    private final AtomicReference<StatisticsPollingService> statisticsPollingServiceRef = new AtomicReference<>();
     private List<MultipartType> collectingStatType;
     private StatisticsGatheringService<T> statisticsGatheringService;
     private StatisticsGatheringOnTheFlyService<T> statisticsGatheringOnTheFlyService;
@@ -76,17 +76,19 @@ class StatisticsContextImpl<T extends OfHeader> implements StatisticsContext {
     StatisticsContextImpl(@Nonnull final DeviceContext deviceContext,
                           @Nonnull final ConvertorExecutor convertorExecutor,
                           @Nonnull final MultipartWriterProvider statisticsWriterProvider,
-                          @Nonnull final ListeningExecutorService executorService, boolean isStatisticsPollingOn,
-                          boolean isUsingReconciliationFramework, long statisticsPollingInterval,
-                          long maximumPollingDelay) {
+                          @Nonnull final ListeningExecutorService executorService,
+                          @Nonnull final OpenflowProviderConfig config,
+                          final boolean isStatisticsPollingOn,
+                          final boolean isUsingReconciliationFramework) {
         this.deviceContext = deviceContext;
         this.devState = Preconditions.checkNotNull(deviceContext.getDeviceState());
         this.executorService = executorService;
         this.isStatisticsPollingOn = isStatisticsPollingOn;
+        this.config = config;
         this.convertorExecutor = convertorExecutor;
         this.deviceInfo = deviceContext.getDeviceInfo();
-        this.statisticsPollingInterval = statisticsPollingInterval;
-        this.maximumPollingDelay = maximumPollingDelay;
+        this.statisticsPollingInterval = config.getBasicTimerDelay().getValue().toJava();
+        this.maximumPollingDelay = config.getMaximumTimerDelay().getValue().toJava();
         this.statisticsWriterProvider = statisticsWriterProvider;
         this.isUsingReconciliationFramework = isUsingReconciliationFramework;
 
@@ -108,13 +110,13 @@ class StatisticsContextImpl<T extends OfHeader> implements StatisticsContext {
     }
 
     @Override
-    public void registerMastershipWatcher(@Nonnull final ContextChainMastershipWatcher contextChainMastershipWatcher) {
-        this.contextChainMastershipWatcher = contextChainMastershipWatcher;
+    public void registerMastershipWatcher(@Nonnull final ContextChainMastershipWatcher newWatcher) {
+        this.contextChainMastershipWatcher = newWatcher;
     }
 
     @Override
     public <O> RequestContext<O> createRequestContext() {
-        final AbstractRequestContext<O> ret = new AbstractRequestContext<O>(deviceInfo.reserveXidForDeviceMessage()) {
+        final AbstractRequestContext<O> ret = new AbstractRequestContext<>(deviceInfo.reserveXidForDeviceMessage()) {
             @Override
             public void close() {
                 requestContexts.remove(this);
@@ -149,31 +151,36 @@ class StatisticsContextImpl<T extends OfHeader> implements StatisticsContext {
 
     @Override
     public void instantiateServiceInstance() {
+
+    }
+
+    @Override
+    public void initializeDevice() {
         final List<MultipartType> statListForCollecting = new ArrayList<>();
 
-        if (devState.isTableStatisticsAvailable()) {
+        if (devState.isTableStatisticsAvailable() && config.isIsTableStatisticsPollingOn()) {
             statListForCollecting.add(MultipartType.OFPMPTABLE);
         }
 
-        if (devState.isFlowStatisticsAvailable()) {
-            statListForCollecting.add(MultipartType.OFPMPFLOW);
-        }
-
-        if (devState.isGroupAvailable()) {
+        if (devState.isGroupAvailable() && config.isIsGroupStatisticsPollingOn()) {
             statListForCollecting.add(MultipartType.OFPMPGROUPDESC);
             statListForCollecting.add(MultipartType.OFPMPGROUP);
         }
 
-        if (devState.isMetersAvailable()) {
+        if (devState.isMetersAvailable() && config.isIsMeterStatisticsPollingOn()) {
             statListForCollecting.add(MultipartType.OFPMPMETERCONFIG);
             statListForCollecting.add(MultipartType.OFPMPMETER);
         }
 
-        if (devState.isPortStatisticsAvailable()) {
+        if (devState.isFlowStatisticsAvailable() && config.isIsFlowStatisticsPollingOn()) {
+            statListForCollecting.add(MultipartType.OFPMPFLOW);
+        }
+
+        if (devState.isPortStatisticsAvailable() && config.isIsPortStatisticsPollingOn()) {
             statListForCollecting.add(MultipartType.OFPMPPORTSTATS);
         }
 
-        if (devState.isQueueStatisticsAvailable()) {
+        if (devState.isQueueStatisticsAvailable() && config.isIsQueueStatisticsPollingOn()) {
             statListForCollecting.add(MultipartType.OFPMPQUEUE);
         }
 
@@ -190,7 +197,7 @@ class StatisticsContextImpl<T extends OfHeader> implements StatisticsContext {
     public void close() {
         Futures.addCallback(stopGatheringData(), new FutureCallback<Void>() {
             @Override
-            public void onSuccess(@Nullable final Void result) {
+            public void onSuccess(final Void result) {
                 requestContexts.forEach(requestContext -> RequestContextUtil
                         .closeRequestContextWithRpcError(requestContext, CONNECTION_CLOSED));
             }
@@ -209,19 +216,19 @@ class StatisticsContextImpl<T extends OfHeader> implements StatisticsContext {
             return Futures.immediateFuture(Boolean.TRUE);
         }
 
-        return this.lastDataGathering.updateAndGet(future -> {
+        return this.lastDataGatheringRef.updateAndGet(future -> {
             // write start timestamp to state snapshot container
             StatisticsGatheringUtils.markDeviceStateSnapshotStart(deviceInfo, deviceContext);
 
             // recreate gathering future if it should be recreated
-            final ListenableFuture<Boolean> lastDataGathering =
-                    Objects.isNull(future) || future.isCancelled() || future.isDone() ? Futures
-                            .immediateFuture(Boolean.TRUE) : future;
+            final ListenableFuture<Boolean> lastDataGathering = future == null || future.isCancelled()
+                    || future.isDone() ? Futures.immediateFuture(Boolean.TRUE) : future;
 
             // build statistics gathering future
             final ListenableFuture<Boolean> newDataGathering = collectingStatType.stream()
                     .reduce(lastDataGathering, this::statChainFuture,
-                        (listenableFuture, asyn) -> Futures.transformAsync(listenableFuture, result -> asyn));
+                        (listenableFuture, asyn) -> Futures.transformAsync(listenableFuture, result -> asyn,
+                                MoreExecutors.directExecutor()));
 
             // write end timestamp to state snapshot container
             Futures.addCallback(newDataGathering, new FutureCallback<Boolean>() {
@@ -266,7 +273,7 @@ class StatisticsContextImpl<T extends OfHeader> implements StatisticsContext {
                                       getDeviceInfo(), multipartType, deviceContext, deviceContext, convertorExecutor,
                                       statisticsWriterProvider, executorService) : Futures
                     .immediateFuture(Boolean.FALSE);
-        });
+        }, MoreExecutors.directExecutor());
     }
 
     private void startGatheringData() {
@@ -283,21 +290,21 @@ class StatisticsContextImpl<T extends OfHeader> implements StatisticsContext {
 
         schedulingEnabled.set(true);
         statisticsPollingService.startAsync();
-        this.statisticsPollingService.set(statisticsPollingService);
+        this.statisticsPollingServiceRef.set(statisticsPollingService);
     }
 
     private ListenableFuture<Void> stopGatheringData() {
         LOG.info("Stopping running statistics gathering for node {}", deviceInfo);
         cancelLastDataGathering();
 
-        return Optional.ofNullable(statisticsPollingService.getAndSet(null)).map(StatisticsPollingService::stop)
+        return Optional.ofNullable(statisticsPollingServiceRef.getAndSet(null)).map(StatisticsPollingService::stop)
                 .orElseGet(() -> Futures.immediateFuture(null));
     }
 
     private void cancelLastDataGathering() {
-        final ListenableFuture<Boolean> future = lastDataGathering.getAndSet(null);
+        final ListenableFuture<Boolean> future = lastDataGatheringRef.getAndSet(null);
 
-        if (Objects.nonNull(future) && !future.isDone() && !future.isCancelled()) {
+        if (future != null && !future.isDone() && !future.isCancelled()) {
             future.cancel(true);
         }
     }
@@ -315,17 +322,14 @@ class StatisticsContextImpl<T extends OfHeader> implements StatisticsContext {
 
     private final class InitialSubmitCallback implements FutureCallback<Boolean> {
         @Override
-        public void onSuccess(@Nullable final Boolean result) {
-            contextChainMastershipWatcher
-                    .onMasterRoleAcquired(deviceInfo, ContextChainMastershipState.INITIAL_GATHERING);
-
+        public void onSuccess(final Boolean result) {
             if (!isUsingReconciliationFramework) {
                 continueInitializationAfterReconciliation();
             }
         }
 
         @Override
-        public void onFailure(@Nonnull final Throwable throwable) {
+        public void onFailure(final Throwable throwable) {
             contextChainMastershipWatcher.onNotAbleToStartMastershipMandatory(deviceInfo,
                                                                               "Initial gathering statistics "
                                                                                       + "unsuccessful: "