X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fimpl%2Fstatistics%2FStatisticsContextImpl.java;h=0fb57f9a11411c06ab542b97bb9d3f632eb73506;hb=13e1d5e6c0237b9378d60526dd8c1d79db6d2b49;hp=38841db49fdde21d10e461a84b58def400ce9585;hpb=89da5edcc40ca9f589c708844bd5efe46a330486;p=openflowplugin.git diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsContextImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsContextImpl.java index 38841db49f..0fb57f9a11 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsContextImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsContextImpl.java @@ -5,27 +5,25 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.openflowplugin.impl.statistics; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; -import com.google.common.util.concurrent.AsyncFunction; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import io.netty.util.Timeout; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import java.util.ArrayList; import java.util.Collection; -import java.util.HashSet; import java.util.List; -import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nonnull; -import javax.annotation.Nullable; -import javax.annotation.concurrent.GuardedBy; -import org.opendaylight.mdsal.common.api.TransactionChainClosedException; +import org.opendaylight.mdsal.binding.api.TransactionChainClosedException; import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier; import org.opendaylight.openflowplugin.api.ConnectionException; import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext; @@ -35,374 +33,307 @@ import org.opendaylight.openflowplugin.api.openflow.device.DeviceState; import org.opendaylight.openflowplugin.api.openflow.device.RequestContext; import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState; import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipWatcher; -import org.opendaylight.openflowplugin.api.openflow.rpc.listener.ItemLifecycleListener; +import org.opendaylight.openflowplugin.api.openflow.lifecycle.DeviceInitializationContext; import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext; -import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager; import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider; import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext; -import org.opendaylight.openflowplugin.impl.rpc.listener.ItemLifecycleListenerImpl; import org.opendaylight.openflowplugin.impl.services.util.RequestContextUtil; import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringOnTheFlyService; import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringService; import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.provider.config.rev160510.OpenflowProviderConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -class StatisticsContextImpl implements StatisticsContext { +class StatisticsContextImpl implements StatisticsContext, DeviceInitializationContext { private static final Logger LOG = LoggerFactory.getLogger(StatisticsContextImpl.class); private static final String CONNECTION_CLOSED = "Connection closed."; - private final ItemLifecycleListener itemLifeCycleListener; - private final Collection> requestContexts = new HashSet<>(); + private final Collection> requestContexts = ConcurrentHashMap.newKeySet(); private final DeviceContext deviceContext; private final DeviceState devState; + private final ListeningExecutorService executorService; private final boolean isStatisticsPollingOn; - private final Object collectionStatTypeLock = new Object(); private final ConvertorExecutor convertorExecutor; private final MultipartWriterProvider statisticsWriterProvider; private final DeviceInfo deviceInfo; - private final StatisticsManager myManager; + private final TimeCounter timeCounter = new TimeCounter(); + private final OpenflowProviderConfig config; + private final long statisticsPollingInterval; + private final long maximumPollingDelay; private final boolean isUsingReconciliationFramework; - @GuardedBy("collectionStatTypeLock") + private final AtomicBoolean schedulingEnabled = new AtomicBoolean(true); + private final AtomicReference> lastDataGatheringRef = new AtomicReference<>(); + private final AtomicReference statisticsPollingServiceRef = new AtomicReference<>(); private List collectingStatType; private StatisticsGatheringService statisticsGatheringService; private StatisticsGatheringOnTheFlyService statisticsGatheringOnTheFlyService; - private Timeout pollTimeout; private ContextChainMastershipWatcher contextChainMastershipWatcher; - private volatile ContextState state = ContextState.INITIALIZATION; - private volatile boolean schedulingEnabled; - private volatile ListenableFuture lastDataGathering; - - StatisticsContextImpl(final boolean isStatisticsPollingOn, - @Nonnull final DeviceContext deviceContext, + StatisticsContextImpl(@Nonnull final DeviceContext deviceContext, @Nonnull final ConvertorExecutor convertorExecutor, - @Nonnull final StatisticsManager myManager, @Nonnull final MultipartWriterProvider statisticsWriterProvider, - boolean isUsingReconciliationFramework) { + @Nonnull final ListeningExecutorService executorService, + @Nonnull final OpenflowProviderConfig config, + final boolean isStatisticsPollingOn, + final boolean isUsingReconciliationFramework) { this.deviceContext = deviceContext; this.devState = Preconditions.checkNotNull(deviceContext.getDeviceState()); + this.executorService = executorService; this.isStatisticsPollingOn = isStatisticsPollingOn; + this.config = config; this.convertorExecutor = convertorExecutor; - statisticsGatheringService = new StatisticsGatheringService<>(this, deviceContext); - statisticsGatheringOnTheFlyService = new StatisticsGatheringOnTheFlyService<>(this, - deviceContext, convertorExecutor, statisticsWriterProvider); - itemLifeCycleListener = new ItemLifecycleListenerImpl(deviceContext); - statListForCollectingInitialization(); this.deviceInfo = deviceContext.getDeviceInfo(); - this.myManager = myManager; - this.lastDataGathering = null; + this.statisticsPollingInterval = config.getBasicTimerDelay().getValue().toJava(); + this.maximumPollingDelay = config.getMaximumTimerDelay().getValue().toJava(); this.statisticsWriterProvider = statisticsWriterProvider; this.isUsingReconciliationFramework = isUsingReconciliationFramework; + + statisticsGatheringService = new StatisticsGatheringService<>(this, deviceContext); + statisticsGatheringOnTheFlyService = new StatisticsGatheringOnTheFlyService<>(this, deviceContext, + convertorExecutor, + statisticsWriterProvider); } @Override - public void statListForCollectingInitialization() { - synchronized (collectionStatTypeLock) { - final List statListForCollecting = new ArrayList<>(); - if (devState.isTableStatisticsAvailable()) { - statListForCollecting.add(MultipartType.OFPMPTABLE); - } - if (devState.isFlowStatisticsAvailable()) { - statListForCollecting.add(MultipartType.OFPMPFLOW); - } - if (devState.isGroupAvailable()) { - statListForCollecting.add(MultipartType.OFPMPGROUPDESC); - statListForCollecting.add(MultipartType.OFPMPGROUP); - } - if (devState.isMetersAvailable()) { - statListForCollecting.add(MultipartType.OFPMPMETERCONFIG); - statListForCollecting.add(MultipartType.OFPMPMETER); - } - if (devState.isPortStatisticsAvailable()) { - statListForCollecting.add(MultipartType.OFPMPPORTSTATS); - } - if (devState.isQueueStatisticsAvailable()) { - statListForCollecting.add(MultipartType.OFPMPQUEUE); - } - collectingStatType = ImmutableList.copyOf(statListForCollecting); - } + public DeviceInfo getDeviceInfo() { + return this.deviceInfo; } + @Nonnull @Override - public ListenableFuture gatherDynamicData() { - if (!isStatisticsPollingOn) { - LOG.debug("Statistics for device {} is not enabled.", getDeviceInfo().getNodeId().getValue()); - return Futures.immediateFuture(Boolean.TRUE); - } - - if (Objects.isNull(lastDataGathering) - || lastDataGathering.isCancelled() - || lastDataGathering.isDone()) { - lastDataGathering = Futures.immediateFuture(Boolean.TRUE); - } - - synchronized (collectionStatTypeLock) { - // write start timestamp to state snapshot container - StatisticsGatheringUtils.markDeviceStateSnapshotStart(deviceContext); - - lastDataGathering = collectingStatType.stream().reduce( - lastDataGathering, - this::statChainFuture, - (a, b) -> Futures.transformAsync(a, (AsyncFunction) result -> b)); - - // write end timestamp to state snapshot container - Futures.addCallback(lastDataGathering, new FutureCallback() { - @Override - public void onSuccess(final Boolean result) { - StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceContext, result); - } - - @Override - public void onFailure(final Throwable t) { - if (!(t instanceof TransactionChainClosedException)) { - StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceContext, false); - } - } - }); - } - - return lastDataGathering; + public ServiceGroupIdentifier getIdentifier() { + return deviceInfo.getServiceIdentifier(); } - private ListenableFuture chooseStat(final MultipartType multipartType){ - ListenableFuture result = Futures.immediateCheckedFuture(Boolean.TRUE); - - switch (multipartType) { - case OFPMPFLOW: - result = collectStatistics(multipartType, devState.isFlowStatisticsAvailable(), true); - break; - case OFPMPTABLE: - result = collectStatistics(multipartType, devState.isTableStatisticsAvailable(), false); - break; - case OFPMPPORTSTATS: - result = collectStatistics(multipartType, devState.isPortStatisticsAvailable(), false); - break; - case OFPMPQUEUE: - result = collectStatistics(multipartType, devState.isQueueStatisticsAvailable(), false); - break; - case OFPMPGROUPDESC: - result = collectStatistics(multipartType, devState.isGroupAvailable(), false); - break; - case OFPMPGROUP: - result = collectStatistics(multipartType, devState.isGroupAvailable(), false); - break; - case OFPMPMETERCONFIG: - result = collectStatistics(multipartType, devState.isMetersAvailable(), false); - break; - case OFPMPMETER: - result = collectStatistics(multipartType, devState.isMetersAvailable(), false); - break; - default: - LOG.warn("Unsupported Statistics type {}", multipartType); - } - - return result; + @Override + public void registerMastershipWatcher(@Nonnull final ContextChainMastershipWatcher newWatcher) { + this.contextChainMastershipWatcher = newWatcher; } - @Override - public RequestContext createRequestContext() { - final AbstractRequestContext ret = new AbstractRequestContext(deviceInfo.reserveXidForDeviceMessage()) { + public RequestContext createRequestContext() { + final AbstractRequestContext ret = new AbstractRequestContext<>(deviceInfo.reserveXidForDeviceMessage()) { @Override public void close() { requestContexts.remove(this); } }; + requestContexts.add(ret); return ret; } @Override - public void close() { - if (ContextState.TERMINATION.equals(state)) { - if (LOG.isDebugEnabled()) { - LOG.debug("StatisticsContext for node {} is already in TERMINATION state.", getDeviceInfo()); - } - } else { - this.state = ContextState.TERMINATION; - stopGatheringData(); - requestContexts.forEach(requestContext -> RequestContextUtil - .closeRequestContextWithRpcError(requestContext, CONNECTION_CLOSED)); - requestContexts.clear(); - } + public void enableGathering() { + this.schedulingEnabled.set(true); } @Override - public void setSchedulingEnabled(final boolean schedulingEnabled) { - this.schedulingEnabled = schedulingEnabled; + public void disableGathering() { + this.schedulingEnabled.set(false); } @Override - public boolean isSchedulingEnabled() { - return schedulingEnabled; - } + public void continueInitializationAfterReconciliation() { + if (deviceContext.initialSubmitTransaction()) { + contextChainMastershipWatcher.onMasterRoleAcquired(deviceInfo, ContextChainMastershipState.INITIAL_SUBMIT); - @Override - public void setPollTimeout(final Timeout pollTimeout) { - this.pollTimeout = pollTimeout; + startGatheringData(); + } else { + contextChainMastershipWatcher + .onNotAbleToStartMastershipMandatory(deviceInfo, "Initial transaction cannot be submitted."); + } } - private ListenableFuture statChainFuture(final ListenableFuture prevFuture, final MultipartType multipartType) { - return Futures.transformAsync(deviceConnectionCheck(), (AsyncFunction) connectionResult -> Futures - .transformAsync(prevFuture, (AsyncFunction) result -> { - LOG.debug("Status of previous stat iteration for node {}: {}", deviceInfo.getLOGValue(), result); - LOG.debug("Stats iterating to next type for node {} of type {}", - deviceInfo, - multipartType); + @Override + public void instantiateServiceInstance() { - return chooseStat(multipartType); - })); } - @VisibleForTesting - ListenableFuture deviceConnectionCheck() { - if (ConnectionContext.CONNECTION_STATE.RIP.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) { - final String errMsg = String - .format("Device connection for node %s doesn't exist anymore. Primary connection status : %s", - getDeviceInfo().getNodeId(), - deviceContext.getPrimaryConnectionContext().getConnectionState()); + @Override + public void initializeDevice() { + final List statListForCollecting = new ArrayList<>(); - return Futures.immediateFailedFuture(new ConnectionException(errMsg)); + if (devState.isTableStatisticsAvailable() && config.isIsTableStatisticsPollingOn()) { + statListForCollecting.add(MultipartType.OFPMPTABLE); } - return Futures.immediateFuture(Boolean.TRUE); - } + if (devState.isGroupAvailable() && config.isIsGroupStatisticsPollingOn()) { + statListForCollecting.add(MultipartType.OFPMPGROUPDESC); + statListForCollecting.add(MultipartType.OFPMPGROUP); + } - private ListenableFuture collectStatistics(final MultipartType multipartType, - final boolean supported, - final boolean onTheFly) { - // TODO: Refactor twice sending deviceContext into gatheringStatistics - return supported ? StatisticsGatheringUtils.gatherStatistics( - onTheFly ? statisticsGatheringOnTheFlyService : statisticsGatheringService, - getDeviceInfo(), - multipartType, - deviceContext, - deviceContext, - convertorExecutor, - statisticsWriterProvider) : Futures.immediateFuture(Boolean.FALSE); - } + if (devState.isMetersAvailable() && config.isIsMeterStatisticsPollingOn()) { + statListForCollecting.add(MultipartType.OFPMPMETERCONFIG); + statListForCollecting.add(MultipartType.OFPMPMETER); + } - @VisibleForTesting - void setStatisticsGatheringService(final StatisticsGatheringService statisticsGatheringService) { - this.statisticsGatheringService = statisticsGatheringService; - } + if (devState.isFlowStatisticsAvailable() && config.isIsFlowStatisticsPollingOn()) { + statListForCollecting.add(MultipartType.OFPMPFLOW); + } - @VisibleForTesting - void setStatisticsGatheringOnTheFlyService(final StatisticsGatheringOnTheFlyService statisticsGatheringOnTheFlyService) { - this.statisticsGatheringOnTheFlyService = statisticsGatheringOnTheFlyService; - } + if (devState.isPortStatisticsAvailable() && config.isIsPortStatisticsPollingOn()) { + statListForCollecting.add(MultipartType.OFPMPPORTSTATS); + } - @Override - public ItemLifecycleListener getItemLifeCycleListener () { - return itemLifeCycleListener; - } + if (devState.isQueueStatisticsAvailable() && config.isIsQueueStatisticsPollingOn()) { + statListForCollecting.add(MultipartType.OFPMPQUEUE); + } - @Override - public DeviceInfo getDeviceInfo() { - return this.deviceInfo; + collectingStatType = ImmutableList.copyOf(statListForCollecting); + Futures.addCallback(gatherDynamicData(), new InitialSubmitCallback(), MoreExecutors.directExecutor()); } @Override - public void registerMastershipWatcher(@Nonnull final ContextChainMastershipWatcher contextChainMastershipWatcher) { - this.contextChainMastershipWatcher = contextChainMastershipWatcher; + public ListenableFuture closeServiceInstance() { + return stopGatheringData(); } @Override - public ListenableFuture closeServiceInstance() { - LOG.info("Stopping statistics context cluster services for node {}", deviceInfo); + public void close() { + Futures.addCallback(stopGatheringData(), new FutureCallback() { + @Override + public void onSuccess(final Void result) { + requestContexts.forEach(requestContext -> RequestContextUtil + .closeRequestContextWithRpcError(requestContext, CONNECTION_CLOSED)); + } - return Futures.transform(Futures.immediateFuture(null), new Function() { - @Nullable @Override - public Void apply(@Nullable final Void input) { - schedulingEnabled = false; - stopGatheringData(); - return null; + public void onFailure(final Throwable throwable) { + requestContexts.forEach(requestContext -> RequestContextUtil + .closeRequestContextWithRpcError(requestContext, CONNECTION_CLOSED)); } - }); + }, MoreExecutors.directExecutor()); } - @Override - public DeviceState gainDeviceState() { - return gainDeviceContext().getDeviceState(); - } + private ListenableFuture gatherDynamicData() { + if (!isStatisticsPollingOn || !schedulingEnabled.get()) { + LOG.debug("Statistics for device {} are not enabled.", getDeviceInfo().getNodeId().getValue()); + return Futures.immediateFuture(Boolean.TRUE); + } - @Override - public DeviceContext gainDeviceContext() { - return this.deviceContext; + return this.lastDataGatheringRef.updateAndGet(future -> { + // write start timestamp to state snapshot container + StatisticsGatheringUtils.markDeviceStateSnapshotStart(deviceInfo, deviceContext); + + // recreate gathering future if it should be recreated + final ListenableFuture lastDataGathering = future == null || future.isCancelled() + || future.isDone() ? Futures.immediateFuture(Boolean.TRUE) : future; + + // build statistics gathering future + final ListenableFuture newDataGathering = collectingStatType.stream() + .reduce(lastDataGathering, this::statChainFuture, + (listenableFuture, asyn) -> Futures.transformAsync(listenableFuture, result -> asyn, + MoreExecutors.directExecutor())); + + // write end timestamp to state snapshot container + Futures.addCallback(newDataGathering, new FutureCallback() { + @Override + public void onSuccess(final Boolean result) { + StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceInfo, deviceContext, result); + } + + @Override + public void onFailure(final Throwable throwable) { + if (!(throwable instanceof TransactionChainClosedException)) { + StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceInfo, deviceContext, false); + } + } + }, MoreExecutors.directExecutor()); + + return newDataGathering; + }); } - @Override - public void stopGatheringData() { - LOG.info("Stopping running statistics gathering for node {}", deviceInfo); + private ListenableFuture statChainFuture(final ListenableFuture prevFuture, + final MultipartType multipartType) { + if (ConnectionContext.CONNECTION_STATE.RIP + .equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) { + final String errMsg = String + .format("Device connection for node %s doesn't exist anymore. Primary connection status : %s", + getDeviceInfo().getNodeId(), + deviceContext.getPrimaryConnectionContext().getConnectionState()); - if (Objects.nonNull(lastDataGathering) && !lastDataGathering.isDone() && !lastDataGathering.isCancelled()) { - lastDataGathering.cancel(true); + return Futures.immediateFailedFuture(new ConnectionException(errMsg)); } - if (Objects.nonNull(pollTimeout) && !pollTimeout.isExpired()) { - pollTimeout.cancel(); + return Futures.transformAsync(prevFuture, result -> { + LOG.debug("Status of previous stat iteration for node {}: {}", deviceInfo, result); + LOG.debug("Stats iterating to next type for node {} of type {}", deviceInfo, multipartType); + final boolean onTheFly = MultipartType.OFPMPFLOW.equals(multipartType); + final boolean supported = collectingStatType.contains(multipartType); + + // TODO: Refactor twice sending deviceContext into gatheringStatistics + return supported ? StatisticsGatheringUtils + .gatherStatistics(onTheFly ? statisticsGatheringOnTheFlyService : statisticsGatheringService, + getDeviceInfo(), multipartType, deviceContext, deviceContext, convertorExecutor, + statisticsWriterProvider, executorService) : Futures + .immediateFuture(Boolean.FALSE); + }, MoreExecutors.directExecutor()); + } + + private void startGatheringData() { + if (!isStatisticsPollingOn) { + return; } + + LOG.info("Starting statistics gathering for node {}", deviceInfo); + final StatisticsPollingService statisticsPollingService = + new StatisticsPollingService(timeCounter, + statisticsPollingInterval, + maximumPollingDelay, + StatisticsContextImpl.this::gatherDynamicData); + + schedulingEnabled.set(true); + statisticsPollingService.startAsync(); + this.statisticsPollingServiceRef.set(statisticsPollingService); } - @Override - public boolean initialSubmitAfterReconciliation() { - final boolean submit = deviceContext.initialSubmitTransaction(); - if (submit) { - myManager.startScheduling(deviceInfo); + private ListenableFuture stopGatheringData() { + LOG.info("Stopping running statistics gathering for node {}", deviceInfo); + cancelLastDataGathering(); + + return Optional.ofNullable(statisticsPollingServiceRef.getAndSet(null)).map(StatisticsPollingService::stop) + .orElseGet(() -> Futures.immediateFuture(null)); + } + + private void cancelLastDataGathering() { + final ListenableFuture future = lastDataGatheringRef.getAndSet(null); + + if (future != null && !future.isDone() && !future.isCancelled()) { + future.cancel(true); } - return submit; } - @Override - public void instantiateServiceInstance() { - LOG.info("Starting statistics context cluster services for node {}", deviceInfo); - this.statListForCollectingInitialization(); + @VisibleForTesting + void setStatisticsGatheringService(final StatisticsGatheringService statisticsGatheringService) { + this.statisticsGatheringService = statisticsGatheringService; + } - Futures.addCallback(this.gatherDynamicData(), new FutureCallback() { - @Override - public void onSuccess(@Nullable Boolean aBoolean) { - contextChainMastershipWatcher.onMasterRoleAcquired( - deviceInfo, - ContextChainMastershipState.INITIAL_GATHERING - ); - - if (!isUsingReconciliationFramework) { - if (deviceContext.initialSubmitTransaction()) { - contextChainMastershipWatcher.onMasterRoleAcquired( - deviceInfo, - ContextChainMastershipState.INITIAL_SUBMIT - ); - - if (isStatisticsPollingOn) { - myManager.startScheduling(deviceInfo); - } - } else { - contextChainMastershipWatcher.onNotAbleToStartMastershipMandatory( - deviceInfo, - "Initial transaction cannot be submitted." - ); - } - } - } + @VisibleForTesting + void setStatisticsGatheringOnTheFlyService( + final StatisticsGatheringOnTheFlyService statisticsGatheringOnTheFlyService) { + this.statisticsGatheringOnTheFlyService = statisticsGatheringOnTheFlyService; + } - @Override - public void onFailure(@Nonnull Throwable throwable) { - contextChainMastershipWatcher.onNotAbleToStartMastershipMandatory( - deviceInfo, - "Initial gathering statistics unsuccessful." - ); + private final class InitialSubmitCallback implements FutureCallback { + @Override + public void onSuccess(final Boolean result) { + if (!isUsingReconciliationFramework) { + continueInitializationAfterReconciliation(); } - }); - } + } - @Nonnull - @Override - public ServiceGroupIdentifier getIdentifier() { - return deviceInfo.getServiceIdentifier(); + @Override + public void onFailure(final Throwable throwable) { + contextChainMastershipWatcher.onNotAbleToStartMastershipMandatory(deviceInfo, + "Initial gathering statistics " + + "unsuccessful: " + + throwable.getMessage()); + } } }