import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
-import org.opendaylight.mdsal.common.api.TransactionChainClosedException;
+
+import org.opendaylight.mdsal.binding.api.TransactionChainClosedException;
import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
import org.opendaylight.openflowplugin.api.ConnectionException;
import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.provider.config.rev160510.OpenflowProviderConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private static final Logger LOG = LoggerFactory.getLogger(StatisticsContextImpl.class);
private static final String CONNECTION_CLOSED = "Connection closed.";
- private final Collection<RequestContext<?>> requestContexts = new HashSet<>();
+ private final Collection<RequestContext<?>> requestContexts = ConcurrentHashMap.newKeySet();
private final DeviceContext deviceContext;
private final DeviceState devState;
private final ListeningExecutorService executorService;
private final MultipartWriterProvider statisticsWriterProvider;
private final DeviceInfo deviceInfo;
private final TimeCounter timeCounter = new TimeCounter();
+ private final OpenflowProviderConfig config;
private final long statisticsPollingInterval;
private final long maximumPollingDelay;
private final boolean isUsingReconciliationFramework;
private final AtomicBoolean schedulingEnabled = new AtomicBoolean(true);
- private final AtomicReference<ListenableFuture<Boolean>> lastDataGathering = new AtomicReference<>();
- private final AtomicReference<StatisticsPollingService> statisticsPollingService = new AtomicReference<>();
+ private final AtomicReference<ListenableFuture<Boolean>> lastDataGatheringRef = new AtomicReference<>();
+ private final AtomicReference<StatisticsPollingService> statisticsPollingServiceRef = new AtomicReference<>();
private List<MultipartType> collectingStatType;
private StatisticsGatheringService<T> statisticsGatheringService;
private StatisticsGatheringOnTheFlyService<T> statisticsGatheringOnTheFlyService;
StatisticsContextImpl(@Nonnull final DeviceContext deviceContext,
@Nonnull final ConvertorExecutor convertorExecutor,
@Nonnull final MultipartWriterProvider statisticsWriterProvider,
- @Nonnull final ListeningExecutorService executorService, boolean isStatisticsPollingOn,
- boolean isUsingReconciliationFramework, long statisticsPollingInterval,
- long maximumPollingDelay) {
+ @Nonnull final ListeningExecutorService executorService,
+ @Nonnull final OpenflowProviderConfig config,
+ boolean isStatisticsPollingOn,
+ boolean isUsingReconciliationFramework) {
this.deviceContext = deviceContext;
this.devState = Preconditions.checkNotNull(deviceContext.getDeviceState());
this.executorService = executorService;
this.isStatisticsPollingOn = isStatisticsPollingOn;
+ this.config = config;
this.convertorExecutor = convertorExecutor;
this.deviceInfo = deviceContext.getDeviceInfo();
- this.statisticsPollingInterval = statisticsPollingInterval;
- this.maximumPollingDelay = maximumPollingDelay;
+ this.statisticsPollingInterval = config.getBasicTimerDelay().getValue();
+ this.maximumPollingDelay = config.getMaximumTimerDelay().getValue();
this.statisticsWriterProvider = statisticsWriterProvider;
this.isUsingReconciliationFramework = isUsingReconciliationFramework;
}
@Override
- public void registerMastershipWatcher(@Nonnull final ContextChainMastershipWatcher contextChainMastershipWatcher) {
- this.contextChainMastershipWatcher = contextChainMastershipWatcher;
+ public void registerMastershipWatcher(@Nonnull final ContextChainMastershipWatcher newWatcher) {
+ this.contextChainMastershipWatcher = newWatcher;
}
@Override
public void instantiateServiceInstance() {
final List<MultipartType> statListForCollecting = new ArrayList<>();
- if (devState.isTableStatisticsAvailable()) {
+ if (devState.isTableStatisticsAvailable() && config.isIsTableStatisticsPollingOn()) {
statListForCollecting.add(MultipartType.OFPMPTABLE);
}
- if (devState.isFlowStatisticsAvailable()) {
- statListForCollecting.add(MultipartType.OFPMPFLOW);
- }
-
- if (devState.isGroupAvailable()) {
+ if (devState.isGroupAvailable() && config.isIsGroupStatisticsPollingOn()) {
statListForCollecting.add(MultipartType.OFPMPGROUPDESC);
statListForCollecting.add(MultipartType.OFPMPGROUP);
}
- if (devState.isMetersAvailable()) {
+ if (devState.isMetersAvailable() && config.isIsMeterStatisticsPollingOn()) {
statListForCollecting.add(MultipartType.OFPMPMETERCONFIG);
statListForCollecting.add(MultipartType.OFPMPMETER);
}
- if (devState.isPortStatisticsAvailable()) {
+ if (devState.isFlowStatisticsAvailable() && config.isIsFlowStatisticsPollingOn()) {
+ statListForCollecting.add(MultipartType.OFPMPFLOW);
+ }
+
+ if (devState.isPortStatisticsAvailable() && config.isIsPortStatisticsPollingOn()) {
statListForCollecting.add(MultipartType.OFPMPPORTSTATS);
}
- if (devState.isQueueStatisticsAvailable()) {
+ if (devState.isQueueStatisticsAvailable() && config.isIsQueueStatisticsPollingOn()) {
statListForCollecting.add(MultipartType.OFPMPQUEUE);
}
collectingStatType = ImmutableList.copyOf(statListForCollecting);
- Futures.addCallback(gatherDynamicData(), new InitialSubmitCallback());
+ Futures.addCallback(gatherDynamicData(), new InitialSubmitCallback(), MoreExecutors.directExecutor());
}
@Override
requestContexts.forEach(requestContext -> RequestContextUtil
.closeRequestContextWithRpcError(requestContext, CONNECTION_CLOSED));
}
- });
+ }, MoreExecutors.directExecutor());
}
private ListenableFuture<Boolean> gatherDynamicData() {
return Futures.immediateFuture(Boolean.TRUE);
}
- return this.lastDataGathering.updateAndGet(future -> {
+ return this.lastDataGatheringRef.updateAndGet(future -> {
// write start timestamp to state snapshot container
StatisticsGatheringUtils.markDeviceStateSnapshotStart(deviceInfo, deviceContext);
// build statistics gathering future
final ListenableFuture<Boolean> newDataGathering = collectingStatType.stream()
.reduce(lastDataGathering, this::statChainFuture,
- (listenableFuture, asyn) -> Futures.transformAsync(listenableFuture, result -> asyn));
+ (listenableFuture, asyn) -> Futures.transformAsync(listenableFuture, result -> asyn,
+ MoreExecutors.directExecutor()));
// write end timestamp to state snapshot container
Futures.addCallback(newDataGathering, new FutureCallback<Boolean>() {
@Override
- public void onSuccess(final Boolean result) {
+ public void onSuccess(@Nonnull final Boolean result) {
StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceInfo, deviceContext, result);
}
StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceInfo, deviceContext, false);
}
}
- });
+ }, MoreExecutors.directExecutor());
return newDataGathering;
});
getDeviceInfo(), multipartType, deviceContext, deviceContext, convertorExecutor,
statisticsWriterProvider, executorService) : Futures
.immediateFuture(Boolean.FALSE);
- });
+ }, MoreExecutors.directExecutor());
}
private void startGatheringData() {
schedulingEnabled.set(true);
statisticsPollingService.startAsync();
- this.statisticsPollingService.set(statisticsPollingService);
+ this.statisticsPollingServiceRef.set(statisticsPollingService);
}
private ListenableFuture<Void> stopGatheringData() {
LOG.info("Stopping running statistics gathering for node {}", deviceInfo);
cancelLastDataGathering();
- return Optional.ofNullable(statisticsPollingService.getAndSet(null)).map(StatisticsPollingService::stop)
+ return Optional.ofNullable(statisticsPollingServiceRef.getAndSet(null)).map(StatisticsPollingService::stop)
.orElseGet(() -> Futures.immediateFuture(null));
}
private void cancelLastDataGathering() {
- final ListenableFuture<Boolean> future = lastDataGathering.getAndSet(null);
+ final ListenableFuture<Boolean> future = lastDataGatheringRef.getAndSet(null);
if (Objects.nonNull(future) && !future.isDone() && !future.isCancelled()) {
future.cancel(true);