X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fimpl%2Fstatistics%2FStatisticsContextImpl.java;h=724531b94f87c2a0028065a3e045e7337e376140;hb=2c9d952924a42a1346768a163ccab684bd0260c8;hp=0d219eb42f25af3233d711f4977bc27e991bcbaf;hpb=17a858fc654709731c6acb8a5870533ccc0b0820;p=openflowplugin.git diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsContextImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsContextImpl.java index 0d219eb42f..724531b94f 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsContextImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsContextImpl.java @@ -14,6 +14,8 @@ import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; @@ -34,11 +36,9 @@ import org.opendaylight.openflowplugin.api.openflow.device.DeviceState; import org.opendaylight.openflowplugin.api.openflow.device.RequestContext; import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState; import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipWatcher; -import org.opendaylight.openflowplugin.api.openflow.rpc.listener.ItemLifecycleListener; import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext; import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider; import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext; -import org.opendaylight.openflowplugin.impl.rpc.listener.ItemLifecycleListenerImpl; import org.opendaylight.openflowplugin.impl.services.util.RequestContextUtil; import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringOnTheFlyService; import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringService; @@ -53,10 +53,10 @@ class StatisticsContextImpl implements StatisticsContext { private static final Logger LOG = LoggerFactory.getLogger(StatisticsContextImpl.class); private static final String CONNECTION_CLOSED = "Connection closed."; - private final ItemLifecycleListener itemLifeCycleListener; private final Collection> requestContexts = new HashSet<>(); private final DeviceContext deviceContext; private final DeviceState devState; + private final ListeningExecutorService executorService; private final boolean isStatisticsPollingOn; private final ConvertorExecutor convertorExecutor; private final MultipartWriterProvider statisticsWriterProvider; @@ -66,8 +66,8 @@ class StatisticsContextImpl implements StatisticsContext { private final long maximumPollingDelay; private final boolean isUsingReconciliationFramework; private final AtomicBoolean schedulingEnabled = new AtomicBoolean(true); - private final AtomicReference> lastDataGathering = new AtomicReference<>(); - private final AtomicReference statisticsPollingService = new AtomicReference<>(); + private final AtomicReference> lastDataGatheringRef = new AtomicReference<>(); + private final AtomicReference statisticsPollingServiceRef = new AtomicReference<>(); private List collectingStatType; private StatisticsGatheringService statisticsGatheringService; private StatisticsGatheringOnTheFlyService statisticsGatheringOnTheFlyService; @@ -76,15 +76,14 @@ class StatisticsContextImpl implements StatisticsContext { StatisticsContextImpl(@Nonnull final DeviceContext deviceContext, @Nonnull final ConvertorExecutor convertorExecutor, @Nonnull final MultipartWriterProvider statisticsWriterProvider, - boolean isStatisticsPollingOn, - boolean isUsingReconciliationFramework, - long statisticsPollingInterval, + @Nonnull final ListeningExecutorService executorService, boolean isStatisticsPollingOn, + boolean isUsingReconciliationFramework, long statisticsPollingInterval, long maximumPollingDelay) { this.deviceContext = deviceContext; this.devState = Preconditions.checkNotNull(deviceContext.getDeviceState()); + this.executorService = executorService; this.isStatisticsPollingOn = isStatisticsPollingOn; this.convertorExecutor = convertorExecutor; - this.itemLifeCycleListener = new ItemLifecycleListenerImpl(deviceContext); this.deviceInfo = deviceContext.getDeviceInfo(); this.statisticsPollingInterval = statisticsPollingInterval; this.maximumPollingDelay = maximumPollingDelay; @@ -92,8 +91,9 @@ class StatisticsContextImpl implements StatisticsContext { this.isUsingReconciliationFramework = isUsingReconciliationFramework; statisticsGatheringService = new StatisticsGatheringService<>(this, deviceContext); - statisticsGatheringOnTheFlyService = new StatisticsGatheringOnTheFlyService<>(this, - deviceContext, convertorExecutor, statisticsWriterProvider); + statisticsGatheringOnTheFlyService = new StatisticsGatheringOnTheFlyService<>(this, deviceContext, + convertorExecutor, + statisticsWriterProvider); } @Override @@ -108,8 +108,8 @@ class StatisticsContextImpl implements StatisticsContext { } @Override - public void registerMastershipWatcher(@Nonnull final ContextChainMastershipWatcher contextChainMastershipWatcher) { - this.contextChainMastershipWatcher = contextChainMastershipWatcher; + public void registerMastershipWatcher(@Nonnull final ContextChainMastershipWatcher newWatcher) { + this.contextChainMastershipWatcher = newWatcher; } @Override @@ -128,31 +128,22 @@ class StatisticsContextImpl implements StatisticsContext { @Override public void enableGathering() { this.schedulingEnabled.set(true); - deviceContext.getItemLifeCycleSourceRegistry() - .getLifeCycleSources().forEach(itemLifeCycleSource -> itemLifeCycleSource - .setItemLifecycleListener(null)); } @Override public void disableGathering() { this.schedulingEnabled.set(false); - deviceContext.getItemLifeCycleSourceRegistry() - .getLifeCycleSources().forEach(itemLifeCycleSource -> itemLifeCycleSource - .setItemLifecycleListener(itemLifeCycleListener)); } @Override public void continueInitializationAfterReconciliation() { if (deviceContext.initialSubmitTransaction()) { - contextChainMastershipWatcher.onMasterRoleAcquired( - deviceInfo, - ContextChainMastershipState.INITIAL_SUBMIT); + contextChainMastershipWatcher.onMasterRoleAcquired(deviceInfo, ContextChainMastershipState.INITIAL_SUBMIT); startGatheringData(); } else { - contextChainMastershipWatcher.onNotAbleToStartMastershipMandatory( - deviceInfo, - "Initial transaction cannot be submitted."); + contextChainMastershipWatcher + .onNotAbleToStartMastershipMandatory(deviceInfo, "Initial transaction cannot be submitted."); } } @@ -164,10 +155,6 @@ class StatisticsContextImpl implements StatisticsContext { statListForCollecting.add(MultipartType.OFPMPTABLE); } - if (devState.isFlowStatisticsAvailable()) { - statListForCollecting.add(MultipartType.OFPMPFLOW); - } - if (devState.isGroupAvailable()) { statListForCollecting.add(MultipartType.OFPMPGROUPDESC); statListForCollecting.add(MultipartType.OFPMPGROUP); @@ -178,6 +165,10 @@ class StatisticsContextImpl implements StatisticsContext { statListForCollecting.add(MultipartType.OFPMPMETER); } + if (devState.isFlowStatisticsAvailable()) { + statListForCollecting.add(MultipartType.OFPMPFLOW); + } + if (devState.isPortStatisticsAvailable()) { statListForCollecting.add(MultipartType.OFPMPPORTSTATS); } @@ -187,7 +178,7 @@ class StatisticsContextImpl implements StatisticsContext { } collectingStatType = ImmutableList.copyOf(statListForCollecting); - Futures.addCallback(gatherDynamicData(), new InitialSubmitCallback()); + Futures.addCallback(gatherDynamicData(), new InitialSubmitCallback(), MoreExecutors.directExecutor()); } @Override @@ -197,7 +188,7 @@ class StatisticsContextImpl implements StatisticsContext { @Override public void close() { - Futures.addCallback(stopGatheringData(), new FutureCallback() { + Futures.addCallback(stopGatheringData(), new FutureCallback() { @Override public void onSuccess(@Nullable final Void result) { requestContexts.forEach(requestContext -> RequestContextUtil @@ -205,11 +196,11 @@ class StatisticsContextImpl implements StatisticsContext { } @Override - public void onFailure(final Throwable t) { + public void onFailure(final Throwable throwable) { requestContexts.forEach(requestContext -> RequestContextUtil .closeRequestContextWithRpcError(requestContext, CONNECTION_CLOSED)); } - }); + }, MoreExecutors.directExecutor()); } private ListenableFuture gatherDynamicData() { @@ -218,22 +209,20 @@ class StatisticsContextImpl implements StatisticsContext { return Futures.immediateFuture(Boolean.TRUE); } - return this.lastDataGathering.updateAndGet(future -> { + return this.lastDataGatheringRef.updateAndGet(future -> { // write start timestamp to state snapshot container StatisticsGatheringUtils.markDeviceStateSnapshotStart(deviceInfo, deviceContext); // recreate gathering future if it should be recreated - final ListenableFuture lastDataGathering = Objects.isNull(future) || - future.isCancelled() || - future.isDone() ? - Futures.immediateFuture(Boolean.TRUE) : - future; + final ListenableFuture lastDataGathering = + Objects.isNull(future) || future.isCancelled() || future.isDone() ? Futures + .immediateFuture(Boolean.TRUE) : future; // build statistics gathering future - final ListenableFuture newDataGathering = collectingStatType.stream().reduce( - lastDataGathering, - this::statChainFuture, - (a, b) -> Futures.transformAsync(a, result -> b)); + final ListenableFuture newDataGathering = collectingStatType.stream() + .reduce(lastDataGathering, this::statChainFuture, + (listenableFuture, asyn) -> Futures.transformAsync(listenableFuture, result -> asyn, + MoreExecutors.directExecutor())); // write end timestamp to state snapshot container Futures.addCallback(newDataGathering, new FutureCallback() { @@ -243,19 +232,21 @@ class StatisticsContextImpl implements StatisticsContext { } @Override - public void onFailure(final Throwable t) { - if (!(t instanceof TransactionChainClosedException)) { + public void onFailure(final Throwable throwable) { + if (!(throwable instanceof TransactionChainClosedException)) { StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceInfo, deviceContext, false); } } - }); + }, MoreExecutors.directExecutor()); return newDataGathering; }); } - private ListenableFuture statChainFuture(final ListenableFuture prevFuture, final MultipartType multipartType) { - if (ConnectionContext.CONNECTION_STATE.RIP.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) { + private ListenableFuture statChainFuture(final ListenableFuture prevFuture, + final MultipartType multipartType) { + if (ConnectionContext.CONNECTION_STATE.RIP + .equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) { final String errMsg = String .format("Device connection for node %s doesn't exist anymore. Primary connection status : %s", getDeviceInfo().getNodeId(), @@ -265,21 +256,18 @@ class StatisticsContextImpl implements StatisticsContext { } return Futures.transformAsync(prevFuture, result -> { - LOG.debug("Status of previous stat iteration for node {}: {}", deviceInfo.getLOGValue(), result); + LOG.debug("Status of previous stat iteration for node {}: {}", deviceInfo, result); LOG.debug("Stats iterating to next type for node {} of type {}", deviceInfo, multipartType); final boolean onTheFly = MultipartType.OFPMPFLOW.equals(multipartType); final boolean supported = collectingStatType.contains(multipartType); // TODO: Refactor twice sending deviceContext into gatheringStatistics - return supported ? StatisticsGatheringUtils.gatherStatistics( - onTheFly ? statisticsGatheringOnTheFlyService : statisticsGatheringService, - getDeviceInfo(), - multipartType, - deviceContext, - deviceContext, - convertorExecutor, - statisticsWriterProvider) : Futures.immediateFuture(Boolean.FALSE); - }); + return supported ? StatisticsGatheringUtils + .gatherStatistics(onTheFly ? statisticsGatheringOnTheFlyService : statisticsGatheringService, + getDeviceInfo(), multipartType, deviceContext, deviceContext, convertorExecutor, + statisticsWriterProvider, executorService) : Futures + .immediateFuture(Boolean.FALSE); + }, MoreExecutors.directExecutor()); } private void startGatheringData() { @@ -288,27 +276,27 @@ class StatisticsContextImpl implements StatisticsContext { } LOG.info("Starting statistics gathering for node {}", deviceInfo); - final StatisticsPollingService statisticsPollingService = new StatisticsPollingService(timeCounter, - statisticsPollingInterval, maximumPollingDelay, - StatisticsContextImpl.this::gatherDynamicData); + final StatisticsPollingService statisticsPollingService = + new StatisticsPollingService(timeCounter, + statisticsPollingInterval, + maximumPollingDelay, + StatisticsContextImpl.this::gatherDynamicData); schedulingEnabled.set(true); statisticsPollingService.startAsync(); - this.statisticsPollingService.set(statisticsPollingService); + this.statisticsPollingServiceRef.set(statisticsPollingService); } private ListenableFuture stopGatheringData() { LOG.info("Stopping running statistics gathering for node {}", deviceInfo); cancelLastDataGathering(); - return Optional - .ofNullable(statisticsPollingService.getAndSet(null)) - .map(StatisticsPollingService::stop) + return Optional.ofNullable(statisticsPollingServiceRef.getAndSet(null)).map(StatisticsPollingService::stop) .orElseGet(() -> Futures.immediateFuture(null)); } private void cancelLastDataGathering() { - final ListenableFuture future = lastDataGathering.getAndSet(null); + final ListenableFuture future = lastDataGatheringRef.getAndSet(null); if (Objects.nonNull(future) && !future.isDone() && !future.isCancelled()) { future.cancel(true); @@ -321,17 +309,16 @@ class StatisticsContextImpl implements StatisticsContext { } @VisibleForTesting - void setStatisticsGatheringOnTheFlyService(final StatisticsGatheringOnTheFlyService statisticsGatheringOnTheFlyService) { + void setStatisticsGatheringOnTheFlyService( + final StatisticsGatheringOnTheFlyService statisticsGatheringOnTheFlyService) { this.statisticsGatheringOnTheFlyService = statisticsGatheringOnTheFlyService; } private final class InitialSubmitCallback implements FutureCallback { @Override public void onSuccess(@Nullable final Boolean result) { - contextChainMastershipWatcher.onMasterRoleAcquired( - deviceInfo, - ContextChainMastershipState.INITIAL_GATHERING - ); + contextChainMastershipWatcher + .onMasterRoleAcquired(deviceInfo, ContextChainMastershipState.INITIAL_GATHERING); if (!isUsingReconciliationFramework) { continueInitializationAfterReconciliation(); @@ -339,10 +326,11 @@ class StatisticsContextImpl implements StatisticsContext { } @Override - public void onFailure(@Nonnull final Throwable t) { - contextChainMastershipWatcher.onNotAbleToStartMastershipMandatory( - deviceInfo, - "Initial gathering statistics unsuccessful: " + t.getMessage()); + public void onFailure(@Nonnull final Throwable throwable) { + contextChainMastershipWatcher.onNotAbleToStartMastershipMandatory(deviceInfo, + "Initial gathering statistics " + + "unsuccessful: " + + throwable.getMessage()); } } -} \ No newline at end of file +}