X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fimpl%2Fstatistics%2FStatisticsContextImpl.java;h=61fd206c2fbe4f0ea7474cadfe3bdb6b10f5aac1;hb=ec9f1ec34c63ac0635dd0e0763f51c7b7d3928e6;hp=1f1ab71e1b72c7a5e88b851c42dd407cc98202b9;hpb=939378c3ca5ecc7866615bc624c0ebfff78d08d2;p=openflowplugin.git diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsContextImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsContextImpl.java index 1f1ab71e1b..61fd206c2f 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsContextImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsContextImpl.java @@ -9,25 +9,23 @@ package org.opendaylight.openflowplugin.impl.statistics; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterators; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.SettableFuture; -import io.netty.util.Timeout; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nonnull; import javax.annotation.Nullable; -import javax.annotation.concurrent.GuardedBy; import org.opendaylight.mdsal.common.api.TransactionChainClosedException; import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier; import org.opendaylight.openflowplugin.api.ConnectionException; @@ -36,16 +34,12 @@ import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext; import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo; import org.opendaylight.openflowplugin.api.openflow.device.DeviceState; import org.opendaylight.openflowplugin.api.openflow.device.RequestContext; -import org.opendaylight.openflowplugin.api.openflow.device.handlers.ClusterInitializationPhaseHandler; import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState; -import org.opendaylight.openflowplugin.api.openflow.lifecycle.MastershipChangeListener; -import org.opendaylight.openflowplugin.api.openflow.rpc.listener.ItemLifecycleListener; +import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipWatcher; import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext; -import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager; +import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider; import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext; -import org.opendaylight.openflowplugin.impl.rpc.listener.ItemLifecycleListenerImpl; import org.opendaylight.openflowplugin.impl.services.util.RequestContextUtil; -import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider; import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringOnTheFlyService; import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringService; import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor; @@ -59,511 +53,284 @@ class StatisticsContextImpl implements StatisticsContext { private static final Logger LOG = LoggerFactory.getLogger(StatisticsContextImpl.class); private static final String CONNECTION_CLOSED = "Connection closed."; - private final ItemLifecycleListener itemLifeCycleListener; private final Collection> requestContexts = new HashSet<>(); private final DeviceContext deviceContext; private final DeviceState devState; - private final ListenableFuture emptyFuture; + private final ListeningExecutorService executorService; private final boolean isStatisticsPollingOn; - private final Object collectionStatTypeLock = new Object(); private final ConvertorExecutor convertorExecutor; private final MultipartWriterProvider statisticsWriterProvider; - @GuardedBy("collectionStatTypeLock") + private final DeviceInfo deviceInfo; + private final TimeCounter timeCounter = new TimeCounter(); + private final long statisticsPollingInterval; + private final long maximumPollingDelay; + private final boolean isUsingReconciliationFramework; + private final AtomicBoolean schedulingEnabled = new AtomicBoolean(true); + private final AtomicReference> lastDataGatheringRef = new AtomicReference<>(); + private final AtomicReference statisticsPollingServiceRef = new AtomicReference<>(); private List collectingStatType; - private StatisticsGatheringService statisticsGatheringService; private StatisticsGatheringOnTheFlyService statisticsGatheringOnTheFlyService; - private Timeout pollTimeout; - private final DeviceInfo deviceInfo; - private final StatisticsManager myManager; + private ContextChainMastershipWatcher contextChainMastershipWatcher; - private volatile boolean schedulingEnabled; - private volatile CONTEXT_STATE state; - private ClusterInitializationPhaseHandler clusterInitializationPhaseHandler; - private ClusterInitializationPhaseHandler initialSubmitHandler; - - private ListenableFuture lastDataGathering; - - StatisticsContextImpl(final boolean isStatisticsPollingOn, - @Nonnull final DeviceContext deviceContext, + StatisticsContextImpl(@Nonnull final DeviceContext deviceContext, @Nonnull final ConvertorExecutor convertorExecutor, - @Nonnull final StatisticsManager myManager, - @Nonnull final MultipartWriterProvider statisticsWriterProvider) { + @Nonnull final MultipartWriterProvider statisticsWriterProvider, + @Nonnull final ListeningExecutorService executorService, boolean isStatisticsPollingOn, + boolean isUsingReconciliationFramework, long statisticsPollingInterval, + long maximumPollingDelay) { this.deviceContext = deviceContext; this.devState = Preconditions.checkNotNull(deviceContext.getDeviceState()); + this.executorService = executorService; this.isStatisticsPollingOn = isStatisticsPollingOn; this.convertorExecutor = convertorExecutor; - emptyFuture = Futures.immediateFuture(false); - statisticsGatheringService = new StatisticsGatheringService<>(this, deviceContext); - statisticsGatheringOnTheFlyService = new StatisticsGatheringOnTheFlyService<>(this, - deviceContext, convertorExecutor, statisticsWriterProvider); - itemLifeCycleListener = new ItemLifecycleListenerImpl(deviceContext); - statListForCollectingInitialization(); - this.state = CONTEXT_STATE.INITIALIZATION; this.deviceInfo = deviceContext.getDeviceInfo(); - this.myManager = myManager; - this.lastDataGathering = null; + this.statisticsPollingInterval = statisticsPollingInterval; + this.maximumPollingDelay = maximumPollingDelay; this.statisticsWriterProvider = statisticsWriterProvider; - } + this.isUsingReconciliationFramework = isUsingReconciliationFramework; - @Override - public void statListForCollectingInitialization() { - synchronized (collectionStatTypeLock) { - final List statListForCollecting = new ArrayList<>(); - if (devState.isTableStatisticsAvailable()) { - statListForCollecting.add(MultipartType.OFPMPTABLE); - } - if (devState.isFlowStatisticsAvailable()) { - statListForCollecting.add(MultipartType.OFPMPFLOW); - } - if (devState.isGroupAvailable()) { - statListForCollecting.add(MultipartType.OFPMPGROUPDESC); - statListForCollecting.add(MultipartType.OFPMPGROUP); - } - if (devState.isMetersAvailable()) { - statListForCollecting.add(MultipartType.OFPMPMETERCONFIG); - statListForCollecting.add(MultipartType.OFPMPMETER); - } - if (devState.isPortStatisticsAvailable()) { - statListForCollecting.add(MultipartType.OFPMPPORTSTATS); - } - if (devState.isQueueStatisticsAvailable()) { - statListForCollecting.add(MultipartType.OFPMPQUEUE); - } - collectingStatType = ImmutableList.copyOf(statListForCollecting); - } + statisticsGatheringService = new StatisticsGatheringService<>(this, deviceContext); + statisticsGatheringOnTheFlyService = new StatisticsGatheringOnTheFlyService<>(this, deviceContext, + convertorExecutor, + statisticsWriterProvider); } - @Override - public ListenableFuture initialGatherDynamicData() { - return gatherDynamicData(true); + public DeviceInfo getDeviceInfo() { + return this.deviceInfo; } + @Nonnull @Override - public ListenableFuture gatherDynamicData(){ - return gatherDynamicData(false); - } - - private ListenableFuture gatherDynamicData(final boolean initial) { - this.lastDataGathering = null; - if (!isStatisticsPollingOn) { - LOG.debug("Statistics for device {} is not enabled.", getDeviceInfo().getNodeId().getValue()); - return Futures.immediateFuture(Boolean.TRUE); - } - final ListenableFuture errorResultFuture = deviceConnectionCheck(); - if (errorResultFuture != null) { - return errorResultFuture; - } - synchronized (collectionStatTypeLock) { - final Iterator statIterator = collectingStatType.iterator(); - final SettableFuture settableStatResultFuture = SettableFuture.create(); - - // write start timestamp to state snapshot container - StatisticsGatheringUtils.markDeviceStateSnapshotStart(deviceContext); - - statChainFuture(statIterator, settableStatResultFuture, initial); - - // write end timestamp to state snapshot container - Futures.addCallback(settableStatResultFuture, new FutureCallback() { - @Override - public void onSuccess(@Nullable final Boolean result) { - StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceContext, true); - } - @Override - public void onFailure(final Throwable t) { - if (!(t instanceof TransactionChainClosedException)) { - StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceContext, false); - } - } - }); - this.lastDataGathering = settableStatResultFuture; - return settableStatResultFuture; - } + public ServiceGroupIdentifier getIdentifier() { + return deviceInfo.getServiceIdentifier(); } - private ListenableFuture chooseStat(final MultipartType multipartType, final boolean initial){ - ListenableFuture result = Futures.immediateCheckedFuture(Boolean.TRUE); - - switch (multipartType) { - case OFPMPFLOW: - result = collectFlowStatistics(multipartType, initial); - break; - case OFPMPTABLE: - result = collectTableStatistics(multipartType); - break; - case OFPMPPORTSTATS: - result = collectPortStatistics(multipartType); - break; - case OFPMPQUEUE: - result = collectQueueStatistics(multipartType); - break; - case OFPMPGROUPDESC: - result = collectGroupDescStatistics(multipartType); - break; - case OFPMPGROUP: - result = collectGroupStatistics(multipartType); - break; - case OFPMPMETERCONFIG: - result = collectMeterConfigStatistics(multipartType); - break; - case OFPMPMETER: - result = collectMeterStatistics(multipartType); - break; - default: - LOG.warn("Unsupported Statistics type {}", multipartType); - } - - return result; + @Override + public void registerMastershipWatcher(@Nonnull final ContextChainMastershipWatcher newWatcher) { + this.contextChainMastershipWatcher = newWatcher; } - @Override - public RequestContext createRequestContext() { - final AbstractRequestContext ret = new AbstractRequestContext(deviceInfo.reserveXidForDeviceMessage()) { + public RequestContext createRequestContext() { + final AbstractRequestContext ret = new AbstractRequestContext(deviceInfo.reserveXidForDeviceMessage()) { @Override public void close() { requestContexts.remove(this); } }; + requestContexts.add(ret); return ret; } @Override - public void close() { - if (CONTEXT_STATE.TERMINATION.equals(getState())) { - if (LOG.isDebugEnabled()) { - LOG.debug("StatisticsContext for node {} is already in TERMINATION state.", getDeviceInfo().getLOGValue()); - } - } else { - - this.state = CONTEXT_STATE.TERMINATION; - - for (final Iterator> iterator = Iterators.consumingIterator(requestContexts.iterator()); - iterator.hasNext(); ) { - RequestContextUtil.closeRequestContextWithRpcError(iterator.next(), CONNECTION_CLOSED); - } - - if (null != pollTimeout && !pollTimeout.isExpired()) { - pollTimeout.cancel(); - } - } + public void enableGathering() { + this.schedulingEnabled.set(true); } @Override - public void setSchedulingEnabled(final boolean schedulingEnabled) { - this.schedulingEnabled = schedulingEnabled; + public void disableGathering() { + this.schedulingEnabled.set(false); } @Override - public boolean isSchedulingEnabled() { - return schedulingEnabled; - } + public void continueInitializationAfterReconciliation() { + if (deviceContext.initialSubmitTransaction()) { + contextChainMastershipWatcher.onMasterRoleAcquired(deviceInfo, ContextChainMastershipState.INITIAL_SUBMIT); - @Override - public void setPollTimeout(final Timeout pollTimeout) { - this.pollTimeout = pollTimeout; + startGatheringData(); + } else { + contextChainMastershipWatcher + .onNotAbleToStartMastershipMandatory(deviceInfo, "Initial transaction cannot be submitted."); + } } @Override - public Optional getPollTimeout() { - return Optional.ofNullable(pollTimeout); - } + public void instantiateServiceInstance() { + final List statListForCollecting = new ArrayList<>(); - private void statChainFuture(final Iterator iterator, final SettableFuture resultFuture, final boolean initial) { - if (ConnectionContext.CONNECTION_STATE.RIP.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) { - final String errMsg = String.format("Device connection is closed for Node : %s.", - getDeviceInfo().getNodeId()); - LOG.debug(errMsg); - resultFuture.setException(new ConnectionException(errMsg)); - return; + if (devState.isTableStatisticsAvailable()) { + statListForCollecting.add(MultipartType.OFPMPTABLE); } - if (!iterator.hasNext()) { - if (initial) { - Futures.addCallback(StatisticsGatheringUtils.gatherStatistics( - statisticsGatheringService, - getDeviceInfo(), - MultipartType.OFPMPPORTDESC, - deviceContext, - deviceContext, - false, - convertorExecutor, - statisticsWriterProvider), new FutureCallback() { - @Override - public void onSuccess(final Boolean result) { - statChainFuture(iterator, resultFuture, false); - } - @Override - public void onFailure(@Nonnull final Throwable t) { - resultFuture.setException(t); - } - }); - - return; - } + if (devState.isFlowStatisticsAvailable()) { + statListForCollecting.add(MultipartType.OFPMPFLOW); + } - resultFuture.set(Boolean.TRUE); - LOG.debug("Stats collection successfully finished for node {}", getDeviceInfo().getLOGValue()); - return; + if (devState.isGroupAvailable()) { + statListForCollecting.add(MultipartType.OFPMPGROUPDESC); + statListForCollecting.add(MultipartType.OFPMPGROUP); } - final MultipartType nextType = iterator.next(); - LOG.debug("Stats iterating to next type for node {} of type {}", getDeviceInfo().getLOGValue(), nextType); + if (devState.isMetersAvailable()) { + statListForCollecting.add(MultipartType.OFPMPMETERCONFIG); + statListForCollecting.add(MultipartType.OFPMPMETER); + } - final ListenableFuture deviceStatisticsCollectionFuture = chooseStat(nextType, initial); - Futures.addCallback(deviceStatisticsCollectionFuture, new FutureCallback() { - @Override - public void onSuccess(final Boolean result) { - statChainFuture(iterator, resultFuture, initial); - } - @Override - public void onFailure(@Nonnull final Throwable t) { - resultFuture.setException(t); - } - }); - } + if (devState.isPortStatisticsAvailable()) { + statListForCollecting.add(MultipartType.OFPMPPORTSTATS); + } - /** - * Method checks a device state. It returns null for be able continue. Otherwise it returns immediateFuture - * which has to be returned from caller too - * - * @return future - */ - @VisibleForTesting - ListenableFuture deviceConnectionCheck() { - if (!ConnectionContext.CONNECTION_STATE.WORKING.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) { - ListenableFuture resultingFuture; - switch (deviceContext.getPrimaryConnectionContext().getConnectionState()) { - case RIP: - final String errMsg = String.format("Device connection doesn't exist anymore. Primary connection status : %s", - deviceContext.getPrimaryConnectionContext().getConnectionState()); - resultingFuture = Futures.immediateFailedFuture(new Throwable(errMsg)); - break; - default: - resultingFuture = Futures.immediateCheckedFuture(Boolean.TRUE); - break; - } - return resultingFuture; + if (devState.isQueueStatisticsAvailable()) { + statListForCollecting.add(MultipartType.OFPMPQUEUE); } - return null; - } - //TODO: Refactor twice sending deviceContext into gatheringStatistics - private ListenableFuture collectFlowStatistics(final MultipartType multipartType, final boolean initial) { - return devState.isFlowStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics( - statisticsGatheringOnTheFlyService, - getDeviceInfo(), - /*MultipartType.OFPMPFLOW*/ multipartType, - deviceContext, - deviceContext, - initial, - convertorExecutor, - statisticsWriterProvider) : emptyFuture; + collectingStatType = ImmutableList.copyOf(statListForCollecting); + Futures.addCallback(gatherDynamicData(), new InitialSubmitCallback(), MoreExecutors.directExecutor()); } - private ListenableFuture collectTableStatistics(final MultipartType multipartType) { - return devState.isTableStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics( - statisticsGatheringService, - getDeviceInfo(), - /*MultipartType.OFPMPTABLE*/ multipartType, - deviceContext, - deviceContext, - false, - convertorExecutor, - statisticsWriterProvider) : emptyFuture; + @Override + public ListenableFuture closeServiceInstance() { + return stopGatheringData(); } - private ListenableFuture collectPortStatistics(final MultipartType multipartType) { - return devState.isPortStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics( - statisticsGatheringService, - getDeviceInfo(), - /*MultipartType.OFPMPPORTSTATS*/ multipartType, - deviceContext, - deviceContext, - false, - convertorExecutor, - statisticsWriterProvider) : emptyFuture; - } + @Override + public void close() { + Futures.addCallback(stopGatheringData(), new FutureCallback() { + @Override + public void onSuccess(@Nullable final Void result) { + requestContexts.forEach(requestContext -> RequestContextUtil + .closeRequestContextWithRpcError(requestContext, CONNECTION_CLOSED)); + } - private ListenableFuture collectQueueStatistics(final MultipartType multipartType) { - return !devState.isQueueStatisticsAvailable() ? emptyFuture : StatisticsGatheringUtils.gatherStatistics( - statisticsGatheringService, - getDeviceInfo(), - /*MultipartType.OFPMPQUEUE*/ multipartType, - deviceContext, - deviceContext, - false, - convertorExecutor, - statisticsWriterProvider); + @Override + public void onFailure(final Throwable throwable) { + requestContexts.forEach(requestContext -> RequestContextUtil + .closeRequestContextWithRpcError(requestContext, CONNECTION_CLOSED)); + } + }, MoreExecutors.directExecutor()); } - private ListenableFuture collectGroupDescStatistics(final MultipartType multipartType) { - return devState.isGroupAvailable() ? StatisticsGatheringUtils.gatherStatistics( - statisticsGatheringService, - getDeviceInfo(), - /*MultipartType.OFPMPGROUPDESC*/ multipartType, - deviceContext, - deviceContext, - false, - convertorExecutor, - statisticsWriterProvider) : emptyFuture; - } + private ListenableFuture gatherDynamicData() { + if (!isStatisticsPollingOn || !schedulingEnabled.get()) { + LOG.debug("Statistics for device {} are not enabled.", getDeviceInfo().getNodeId().getValue()); + return Futures.immediateFuture(Boolean.TRUE); + } - private ListenableFuture collectGroupStatistics(final MultipartType multipartType) { - return devState.isGroupAvailable() ? StatisticsGatheringUtils.gatherStatistics( - statisticsGatheringService, - getDeviceInfo(), - /*MultipartType.OFPMPGROUP*/ multipartType, - deviceContext, - deviceContext, - false, - convertorExecutor, - statisticsWriterProvider) : emptyFuture; - } + return this.lastDataGatheringRef.updateAndGet(future -> { + // write start timestamp to state snapshot container + StatisticsGatheringUtils.markDeviceStateSnapshotStart(deviceInfo, deviceContext); - private ListenableFuture collectMeterConfigStatistics(final MultipartType multipartType) { - return devState.isMetersAvailable() ? StatisticsGatheringUtils.gatherStatistics( - statisticsGatheringService, - getDeviceInfo(), - /*MultipartType.OFPMPMETERCONFIG*/ multipartType, - deviceContext, - deviceContext, - false, - convertorExecutor, - statisticsWriterProvider) : emptyFuture; - } + // recreate gathering future if it should be recreated + final ListenableFuture lastDataGathering = + Objects.isNull(future) || future.isCancelled() || future.isDone() ? Futures + .immediateFuture(Boolean.TRUE) : future; - private ListenableFuture collectMeterStatistics(final MultipartType multipartType) { - return devState.isMetersAvailable() ? StatisticsGatheringUtils.gatherStatistics( - statisticsGatheringService, - getDeviceInfo(), - /*MultipartType.OFPMPMETER*/ multipartType, - deviceContext, - deviceContext, - false, - convertorExecutor, - statisticsWriterProvider) : emptyFuture; - } + // build statistics gathering future + final ListenableFuture newDataGathering = collectingStatType.stream() + .reduce(lastDataGathering, this::statChainFuture, + (listenableFuture, asyn) -> Futures.transformAsync(listenableFuture, result -> asyn, + MoreExecutors.directExecutor())); - @VisibleForTesting - void setStatisticsGatheringService(final StatisticsGatheringService statisticsGatheringService) { - this.statisticsGatheringService = statisticsGatheringService; - } + // write end timestamp to state snapshot container + Futures.addCallback(newDataGathering, new FutureCallback() { + @Override + public void onSuccess(final Boolean result) { + StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceInfo, deviceContext, result); + } - @VisibleForTesting - void setStatisticsGatheringOnTheFlyService(final StatisticsGatheringOnTheFlyService statisticsGatheringOnTheFlyService) { - this.statisticsGatheringOnTheFlyService = statisticsGatheringOnTheFlyService; - } + @Override + public void onFailure(final Throwable throwable) { + if (!(throwable instanceof TransactionChainClosedException)) { + StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceInfo, deviceContext, false); + } + } + }, MoreExecutors.directExecutor()); - @Override - public ItemLifecycleListener getItemLifeCycleListener () { - return itemLifeCycleListener; + return newDataGathering; + }); } - @Override - public CONTEXT_STATE getState() { - return this.state; - } + private ListenableFuture statChainFuture(final ListenableFuture prevFuture, + final MultipartType multipartType) { + if (ConnectionContext.CONNECTION_STATE.RIP + .equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) { + final String errMsg = String + .format("Device connection for node %s doesn't exist anymore. Primary connection status : %s", + getDeviceInfo().getNodeId(), + deviceContext.getPrimaryConnectionContext().getConnectionState()); - @Override - public ServiceGroupIdentifier getServiceIdentifier() { - return this.deviceInfo.getServiceIdentifier(); - } + return Futures.immediateFailedFuture(new ConnectionException(errMsg)); + } - @Override - public DeviceInfo getDeviceInfo() { - return this.deviceInfo; + return Futures.transformAsync(prevFuture, result -> { + LOG.debug("Status of previous stat iteration for node {}: {}", deviceInfo, result); + LOG.debug("Stats iterating to next type for node {} of type {}", deviceInfo, multipartType); + final boolean onTheFly = MultipartType.OFPMPFLOW.equals(multipartType); + final boolean supported = collectingStatType.contains(multipartType); + + // TODO: Refactor twice sending deviceContext into gatheringStatistics + return supported ? StatisticsGatheringUtils + .gatherStatistics(onTheFly ? statisticsGatheringOnTheFlyService : statisticsGatheringService, + getDeviceInfo(), multipartType, deviceContext, deviceContext, convertorExecutor, + statisticsWriterProvider, executorService) : Futures + .immediateFuture(Boolean.FALSE); + }, MoreExecutors.directExecutor()); } - @Override - public ListenableFuture stopClusterServices() { - if (CONTEXT_STATE.TERMINATION.equals(this.state)) { - return Futures.immediateCancelledFuture(); + private void startGatheringData() { + if (!isStatisticsPollingOn) { + return; } - return Futures.transform(Futures.immediateFuture(null), new Function() { - @Nullable - @Override - public Void apply(@Nullable Object input) { - schedulingEnabled = false; - stopGatheringData(); - return null; - } - }); - } + LOG.info("Starting statistics gathering for node {}", deviceInfo); + final StatisticsPollingService statisticsPollingService = + new StatisticsPollingService(timeCounter, + statisticsPollingInterval, + maximumPollingDelay, + StatisticsContextImpl.this::gatherDynamicData); - @Override - public DeviceState gainDeviceState() { - return gainDeviceContext().getDeviceState(); + schedulingEnabled.set(true); + statisticsPollingService.startAsync(); + this.statisticsPollingServiceRef.set(statisticsPollingService); } - @Override - public DeviceContext gainDeviceContext() { - return this.deviceContext; + private ListenableFuture stopGatheringData() { + LOG.info("Stopping running statistics gathering for node {}", deviceInfo); + cancelLastDataGathering(); + + return Optional.ofNullable(statisticsPollingServiceRef.getAndSet(null)).map(StatisticsPollingService::stop) + .orElseGet(() -> Futures.immediateFuture(null)); } - @Override - public void stopGatheringData() { - if (Objects.nonNull(this.lastDataGathering)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Stop the running statistics gathering for node {}", this.deviceInfo.getLOGValue()); - } + private void cancelLastDataGathering() { + final ListenableFuture future = lastDataGatheringRef.getAndSet(null); - lastDataGathering.cancel(true); + if (Objects.nonNull(future) && !future.isDone() && !future.isCancelled()) { + future.cancel(true); } } - @Override - public void setLifecycleInitializationPhaseHandler(final ClusterInitializationPhaseHandler handler) { - this.clusterInitializationPhaseHandler = handler; + @VisibleForTesting + void setStatisticsGatheringService(final StatisticsGatheringService statisticsGatheringService) { + this.statisticsGatheringService = statisticsGatheringService; } - @Override - public boolean onContextInstantiateService(final MastershipChangeListener mastershipChangeListener) { - - LOG.info("Starting statistics context cluster services for node {}", deviceInfo.getLOGValue()); - - this.statListForCollectingInitialization(); - Futures.addCallback(this.initialGatherDynamicData(), new FutureCallback() { + @VisibleForTesting + void setStatisticsGatheringOnTheFlyService( + final StatisticsGatheringOnTheFlyService statisticsGatheringOnTheFlyService) { + this.statisticsGatheringOnTheFlyService = statisticsGatheringOnTheFlyService; + } - @Override - public void onSuccess(@Nullable Boolean aBoolean) { - mastershipChangeListener.onMasterRoleAcquired( - deviceInfo, - ContextChainMastershipState.INITIAL_GATHERING - ); - if (initialSubmitHandler.initialSubmitTransaction()) { - mastershipChangeListener.onMasterRoleAcquired( - deviceInfo, - ContextChainMastershipState.INITIAL_SUBMIT - ); - if (isStatisticsPollingOn) { - myManager.startScheduling(deviceInfo); - } - } else { - mastershipChangeListener.onNotAbleToStartMastershipMandatory( - deviceInfo, - "Initial transaction cannot be submitted." - ); - } - } + private final class InitialSubmitCallback implements FutureCallback { + @Override + public void onSuccess(@Nullable final Boolean result) { + contextChainMastershipWatcher + .onMasterRoleAcquired(deviceInfo, ContextChainMastershipState.INITIAL_GATHERING); - @Override - public void onFailure(@Nonnull Throwable throwable) { - mastershipChangeListener.onNotAbleToStartMastershipMandatory( - deviceInfo, - "Initial gathering statistics unsuccessful." - ); + if (!isUsingReconciliationFramework) { + continueInitializationAfterReconciliation(); } - }); - - return this.clusterInitializationPhaseHandler.onContextInstantiateService(mastershipChangeListener); - } + } - @Override - public void setInitialSubmitHandler(final ClusterInitializationPhaseHandler initialSubmitHandler) { - this.initialSubmitHandler = initialSubmitHandler; + @Override + public void onFailure(@Nonnull final Throwable throwable) { + contextChainMastershipWatcher.onNotAbleToStartMastershipMandatory(deviceInfo, + "Initial gathering statistics " + + "unsuccessful: " + + throwable.getMessage()); + } } }