X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fimpl%2Fstatistics%2FStatisticsContextImpl.java;h=d95527476942ea2106d7cd13a22604a4f60a398a;hb=391a84bf97e3f2ffa550ef50276748a9145f2579;hp=144d664c2b631c5203b4b207a66e87de749576ec;hpb=d1300732c6082695390dc921336d191df48f8165;p=openflowplugin.git diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsContextImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsContextImpl.java index 144d664c2b..d955274769 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsContextImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsContextImpl.java @@ -9,6 +9,7 @@ package org.opendaylight.openflowplugin.impl.statistics; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterators; @@ -22,33 +23,37 @@ import java.util.Collection; import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Objects; import java.util.Optional; -import java.util.concurrent.ExecutionException; import javax.annotation.Nonnull; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; import org.opendaylight.mdsal.common.api.TransactionChainClosedException; import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier; +import org.opendaylight.openflowplugin.api.ConnectionException; import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext; import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext; import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo; import org.opendaylight.openflowplugin.api.openflow.device.DeviceState; import org.opendaylight.openflowplugin.api.openflow.device.RequestContext; -import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService; +import org.opendaylight.openflowplugin.api.openflow.device.handlers.ClusterInitializationPhaseHandler; +import org.opendaylight.openflowplugin.api.openflow.lifecycle.MastershipChangeListener; import org.opendaylight.openflowplugin.api.openflow.rpc.listener.ItemLifecycleListener; import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext; import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager; import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext; import org.opendaylight.openflowplugin.impl.rpc.listener.ItemLifecycleListenerImpl; -import org.opendaylight.openflowplugin.impl.services.RequestContextUtil; +import org.opendaylight.openflowplugin.impl.services.util.RequestContextUtil; +import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider; import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringOnTheFlyService; import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringService; import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -class StatisticsContextImpl implements StatisticsContext { +class StatisticsContextImpl implements StatisticsContext { private static final Logger LOG = LoggerFactory.getLogger(StatisticsContextImpl.class); private static final String CONNECTION_CLOSED = "Connection closed."; @@ -58,45 +63,51 @@ class StatisticsContextImpl implements StatisticsContext { private final DeviceContext deviceContext; private final DeviceState devState; private final ListenableFuture emptyFuture; - private final boolean shuttingDownStatisticsPolling; - private final Object COLLECTION_STAT_TYPE_LOCK = new Object(); - private final SinglePurposeMultipartReplyTranslator multipartReplyTranslator; - @GuardedBy("COLLECTION_STAT_TYPE_LOCK") + private final boolean isStatisticsPollingOn; + private final Object collectionStatTypeLock = new Object(); + private final ConvertorExecutor convertorExecutor; + private final MultipartWriterProvider statisticsWriterProvider; + @GuardedBy("collectionStatTypeLock") private List collectingStatType; - private StatisticsGatheringService statisticsGatheringService; - private StatisticsGatheringOnTheFlyService statisticsGatheringOnTheFlyService; + private StatisticsGatheringService statisticsGatheringService; + private StatisticsGatheringOnTheFlyService statisticsGatheringOnTheFlyService; private Timeout pollTimeout; private final DeviceInfo deviceInfo; private final StatisticsManager myManager; - private final LifecycleService lifecycleService; private volatile boolean schedulingEnabled; private volatile CONTEXT_STATE state; + private ClusterInitializationPhaseHandler clusterInitializationPhaseHandler; + private ClusterInitializationPhaseHandler initialSubmitHandler; - StatisticsContextImpl(@Nonnull final DeviceInfo deviceInfo, - final boolean shuttingDownStatisticsPolling, - @Nonnull final LifecycleService lifecycleService, - @Nonnull final ConvertorExecutor convertorExecutor, - @Nonnull final StatisticsManager myManager) { - this.lifecycleService = lifecycleService; - this.deviceContext = lifecycleService.getDeviceContext(); + private ListenableFuture lastDataGathering; + + StatisticsContextImpl(final boolean isStatisticsPollingOn, + @Nonnull final DeviceContext deviceContext, + @Nonnull final ConvertorExecutor convertorExecutor, + @Nonnull final StatisticsManager myManager, + @Nonnull final MultipartWriterProvider statisticsWriterProvider) { + this.deviceContext = deviceContext; this.devState = Preconditions.checkNotNull(deviceContext.getDeviceState()); - this.shuttingDownStatisticsPolling = shuttingDownStatisticsPolling; - multipartReplyTranslator = new SinglePurposeMultipartReplyTranslator(convertorExecutor); + this.isStatisticsPollingOn = isStatisticsPollingOn; + this.convertorExecutor = convertorExecutor; emptyFuture = Futures.immediateFuture(false); - statisticsGatheringService = new StatisticsGatheringService(this, deviceContext); - statisticsGatheringOnTheFlyService = new StatisticsGatheringOnTheFlyService(this, deviceContext, convertorExecutor); + statisticsGatheringService = new StatisticsGatheringService<>(this, deviceContext); + statisticsGatheringOnTheFlyService = new StatisticsGatheringOnTheFlyService<>(this, + deviceContext, convertorExecutor, statisticsWriterProvider); itemLifeCycleListener = new ItemLifecycleListenerImpl(deviceContext); statListForCollectingInitialization(); - setState(CONTEXT_STATE.INITIALIZATION); - this.deviceInfo = deviceInfo; + this.state = CONTEXT_STATE.INITIALIZATION; + this.deviceInfo = deviceContext.getDeviceInfo(); this.myManager = myManager; + this.lastDataGathering = null; + this.statisticsWriterProvider = statisticsWriterProvider; } @Override public void statListForCollectingInitialization() { - synchronized (COLLECTION_STAT_TYPE_LOCK) { + synchronized (collectionStatTypeLock) { final List statListForCollecting = new ArrayList<>(); if (devState.isTableStatisticsAvailable()) { statListForCollecting.add(MultipartType.OFPMPTABLE); @@ -134,7 +145,8 @@ class StatisticsContextImpl implements StatisticsContext { } private ListenableFuture gatherDynamicData(final boolean initial) { - if (shuttingDownStatisticsPolling) { + this.lastDataGathering = null; + if (!isStatisticsPollingOn) { LOG.debug("Statistics for device {} is not enabled.", getDeviceInfo().getNodeId().getValue()); return Futures.immediateFuture(Boolean.TRUE); } @@ -142,7 +154,7 @@ class StatisticsContextImpl implements StatisticsContext { if (errorResultFuture != null) { return errorResultFuture; } - synchronized (COLLECTION_STAT_TYPE_LOCK) { + synchronized (collectionStatTypeLock) { final Iterator statIterator = collectingStatType.iterator(); final SettableFuture settableStatResultFuture = SettableFuture.create(); @@ -164,32 +176,44 @@ class StatisticsContextImpl implements StatisticsContext { } } }); + this.lastDataGathering = settableStatResultFuture; return settableStatResultFuture; } } private ListenableFuture chooseStat(final MultipartType multipartType, final boolean initial){ + ListenableFuture result = Futures.immediateCheckedFuture(Boolean.TRUE); + switch (multipartType) { case OFPMPFLOW: - return collectFlowStatistics(multipartType, initial); + result = collectFlowStatistics(multipartType, initial); + break; case OFPMPTABLE: - return collectTableStatistics(multipartType); + result = collectTableStatistics(multipartType); + break; case OFPMPPORTSTATS: - return collectPortStatistics(multipartType); + result = collectPortStatistics(multipartType); + break; case OFPMPQUEUE: - return collectQueueStatistics(multipartType); + result = collectQueueStatistics(multipartType); + break; case OFPMPGROUPDESC: - return collectGroupDescStatistics(multipartType); + result = collectGroupDescStatistics(multipartType); + break; case OFPMPGROUP: - return collectGroupStatistics(multipartType); + result = collectGroupStatistics(multipartType); + break; case OFPMPMETERCONFIG: - return collectMeterConfigStatistics(multipartType); + result = collectMeterConfigStatistics(multipartType); + break; case OFPMPMETER: - return collectMeterStatistics(multipartType); + result = collectMeterStatistics(multipartType); + break; default: - LOG.warn("Unsuported Statistics type {}", multipartType); - return Futures.immediateCheckedFuture(Boolean.TRUE); + LOG.warn("Unsupported Statistics type {}", multipartType); } + + return result; } @@ -209,15 +233,22 @@ class StatisticsContextImpl implements StatisticsContext { public void close() { if (CONTEXT_STATE.TERMINATION.equals(getState())) { if (LOG.isDebugEnabled()) { - LOG.debug("Statistics context is already in state TERMINATION."); + LOG.debug("StatisticsContext for node {} is already in TERMINATION state.", getDeviceInfo().getLOGValue()); } } else { - setState(CONTEXT_STATE.TERMINATION); - schedulingEnabled = false; + try { + stopClusterServices(true).get(); + } catch (Exception e) { + LOG.debug("Failed to close StatisticsContext for node {} with exception: ", getDeviceInfo().getLOGValue(), e); + } + + this.state = CONTEXT_STATE.TERMINATION; + for (final Iterator> iterator = Iterators.consumingIterator(requestContexts.iterator()); iterator.hasNext(); ) { RequestContextUtil.closeRequestContextWithRpcError(iterator.next(), CONNECTION_CLOSED); } + if (null != pollTimeout && !pollTimeout.isExpired()) { pollTimeout.cancel(); } @@ -247,19 +278,19 @@ class StatisticsContextImpl implements StatisticsContext { private void statChainFuture(final Iterator iterator, final SettableFuture resultFuture, final boolean initial) { if (ConnectionContext.CONNECTION_STATE.RIP.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) { final String errMsg = String.format("Device connection is closed for Node : %s.", - getDeviceInfo().getNodeId()); + getDeviceInfo().getNodeId()); LOG.debug(errMsg); - resultFuture.setException(new IllegalStateException(errMsg)); + resultFuture.setException(new ConnectionException(errMsg)); return; } if ( ! iterator.hasNext()) { resultFuture.set(Boolean.TRUE); - LOG.debug("Stats collection successfully finished for node {}", getDeviceInfo().getNodeId()); + LOG.debug("Stats collection successfully finished for node {}", getDeviceInfo().getLOGValue()); return; } final MultipartType nextType = iterator.next(); - LOG.debug("Stats iterating to next type for node {} of type {}", getDeviceInfo().getNodeId(), nextType); + LOG.debug("Stats iterating to next type for node {} of type {}", getDeviceInfo().getLOGValue(), nextType); final ListenableFuture deviceStatisticsCollectionFuture = chooseStat(nextType, initial); Futures.addCallback(deviceStatisticsCollectionFuture, new FutureCallback() { @@ -278,7 +309,7 @@ class StatisticsContextImpl implements StatisticsContext { * Method checks a device state. It returns null for be able continue. Otherwise it returns immediateFuture * which has to be returned from caller too * - * @return + * @return future */ @VisibleForTesting ListenableFuture deviceConnectionCheck() { @@ -287,7 +318,7 @@ class StatisticsContextImpl implements StatisticsContext { switch (deviceContext.getPrimaryConnectionContext().getConnectionState()) { case RIP: final String errMsg = String.format("Device connection doesn't exist anymore. Primary connection status : %s", - deviceContext.getPrimaryConnectionContext().getConnectionState()); + deviceContext.getPrimaryConnectionContext().getConnectionState()); resultingFuture = Futures.immediateFailedFuture(new Throwable(errMsg)); break; default: @@ -302,92 +333,107 @@ class StatisticsContextImpl implements StatisticsContext { //TODO: Refactor twice sending deviceContext into gatheringStatistics private ListenableFuture collectFlowStatistics(final MultipartType multipartType, final boolean initial) { return devState.isFlowStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics( - statisticsGatheringOnTheFlyService, - getDeviceInfo(), - /*MultipartType.OFPMPFLOW*/ multipartType, - deviceContext, - deviceContext, - initial, multipartReplyTranslator) : emptyFuture; + statisticsGatheringOnTheFlyService, + getDeviceInfo(), + /*MultipartType.OFPMPFLOW*/ multipartType, + deviceContext, + deviceContext, + initial, + convertorExecutor, + statisticsWriterProvider) : emptyFuture; } private ListenableFuture collectTableStatistics(final MultipartType multipartType) { return devState.isTableStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics( - statisticsGatheringService, - getDeviceInfo(), + statisticsGatheringService, + getDeviceInfo(), /*MultipartType.OFPMPTABLE*/ multipartType, - deviceContext, - deviceContext, - false, multipartReplyTranslator) : emptyFuture; + deviceContext, + deviceContext, + false, + convertorExecutor, + statisticsWriterProvider) : emptyFuture; } private ListenableFuture collectPortStatistics(final MultipartType multipartType) { return devState.isPortStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics( - statisticsGatheringService, - getDeviceInfo(), + statisticsGatheringService, + getDeviceInfo(), /*MultipartType.OFPMPPORTSTATS*/ multipartType, - deviceContext, - deviceContext, - false, multipartReplyTranslator) : emptyFuture; + deviceContext, + deviceContext, + false, + convertorExecutor, + statisticsWriterProvider) : emptyFuture; } private ListenableFuture collectQueueStatistics(final MultipartType multipartType) { return !devState.isQueueStatisticsAvailable() ? emptyFuture : StatisticsGatheringUtils.gatherStatistics( - statisticsGatheringService, - getDeviceInfo(), + statisticsGatheringService, + getDeviceInfo(), /*MultipartType.OFPMPQUEUE*/ multipartType, - deviceContext, - deviceContext, - false, multipartReplyTranslator); + deviceContext, + deviceContext, + false, + convertorExecutor, + statisticsWriterProvider); } private ListenableFuture collectGroupDescStatistics(final MultipartType multipartType) { return devState.isGroupAvailable() ? StatisticsGatheringUtils.gatherStatistics( - statisticsGatheringService, - getDeviceInfo(), + statisticsGatheringService, + getDeviceInfo(), /*MultipartType.OFPMPGROUPDESC*/ multipartType, - deviceContext, - deviceContext, - false, multipartReplyTranslator) : emptyFuture; + deviceContext, + deviceContext, + false, + convertorExecutor, + statisticsWriterProvider) : emptyFuture; } private ListenableFuture collectGroupStatistics(final MultipartType multipartType) { return devState.isGroupAvailable() ? StatisticsGatheringUtils.gatherStatistics( - statisticsGatheringService, - getDeviceInfo(), + statisticsGatheringService, + getDeviceInfo(), /*MultipartType.OFPMPGROUP*/ multipartType, - deviceContext, - deviceContext, - false, multipartReplyTranslator) : emptyFuture; + deviceContext, + deviceContext, + false, + convertorExecutor, + statisticsWriterProvider) : emptyFuture; } private ListenableFuture collectMeterConfigStatistics(final MultipartType multipartType) { return devState.isMetersAvailable() ? StatisticsGatheringUtils.gatherStatistics( - statisticsGatheringService, - getDeviceInfo(), + statisticsGatheringService, + getDeviceInfo(), /*MultipartType.OFPMPMETERCONFIG*/ multipartType, - deviceContext, - deviceContext, - false, multipartReplyTranslator) : emptyFuture; + deviceContext, + deviceContext, + false, + convertorExecutor, + statisticsWriterProvider) : emptyFuture; } private ListenableFuture collectMeterStatistics(final MultipartType multipartType) { return devState.isMetersAvailable() ? StatisticsGatheringUtils.gatherStatistics( - statisticsGatheringService, - getDeviceInfo(), + statisticsGatheringService, + getDeviceInfo(), /*MultipartType.OFPMPMETER*/ multipartType, - deviceContext, - deviceContext, - false, multipartReplyTranslator) : emptyFuture; + deviceContext, + deviceContext, + false, + convertorExecutor, + statisticsWriterProvider) : emptyFuture; } @VisibleForTesting - void setStatisticsGatheringService(final StatisticsGatheringService statisticsGatheringService) { + void setStatisticsGatheringService(final StatisticsGatheringService statisticsGatheringService) { this.statisticsGatheringService = statisticsGatheringService; } @VisibleForTesting - void setStatisticsGatheringOnTheFlyService(final StatisticsGatheringOnTheFlyService - statisticsGatheringOnTheFlyService) { + void setStatisticsGatheringOnTheFlyService(final StatisticsGatheringOnTheFlyService statisticsGatheringOnTheFlyService) { this.statisticsGatheringOnTheFlyService = statisticsGatheringOnTheFlyService; } @@ -401,11 +447,6 @@ class StatisticsContextImpl implements StatisticsContext { return this.state; } - @Override - public void setState(CONTEXT_STATE state) { - this.state = state; - } - @Override public ServiceGroupIdentifier getServiceIdentifier() { return this.deviceInfo.getServiceIdentifier(); @@ -417,22 +458,77 @@ class StatisticsContextImpl implements StatisticsContext { } @Override - public void startupClusterServices() throws ExecutionException, InterruptedException { - if (!this.shuttingDownStatisticsPolling) { - this.statListForCollectingInitialization(); - this.initialGatherDynamicData(); - myManager.startScheduling(deviceInfo); + public ListenableFuture stopClusterServices() { + if (CONTEXT_STATE.TERMINATION.equals(this.state)) { + return Futures.immediateCancelledFuture(); } + + return Futures.transform(Futures.immediateFuture(null), new Function() { + @Nullable + @Override + public Void apply(@Nullable Object input) { + schedulingEnabled = false; + stopGatheringData(); + return null; + } + }); } @Override - public ListenableFuture stopClusterServices() { - myManager.stopScheduling(deviceInfo); - return Futures.immediateFuture(null); + public DeviceState gainDeviceState() { + return gainDeviceContext().getDeviceState(); + } + + @Override + public DeviceContext gainDeviceContext() { + return this.deviceContext; + } + + @Override + public void stopGatheringData() { + if (Objects.nonNull(this.lastDataGathering)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Stop the running statistics gathering for node {}", this.deviceInfo.getLOGValue()); + } + + lastDataGathering.cancel(true); + } + } + + @Override + public void setLifecycleInitializationPhaseHandler(final ClusterInitializationPhaseHandler handler) { + this.clusterInitializationPhaseHandler = handler; + } + + @Override + public boolean onContextInstantiateService(final MastershipChangeListener mastershipChangeListener) { + + LOG.info("Starting statistics context cluster services for node {}", deviceInfo.getLOGValue()); + + this.statListForCollectingInitialization(); + Futures.addCallback(this.initialGatherDynamicData(), new FutureCallback() { + + @Override + public void onSuccess(@Nullable Boolean aBoolean) { + initialSubmitHandler.initialSubmitTransaction(); + } + + @Override + public void onFailure(@Nonnull Throwable throwable) { + LOG.warn("Initial gathering statistics unsuccessful for node {}", deviceInfo.getLOGValue()); + mastershipChangeListener.onNotAbleToStartMastership(deviceInfo); + } + }); + + if (this.isStatisticsPollingOn) { + myManager.startScheduling(deviceInfo); + } + + return this.clusterInitializationPhaseHandler.onContextInstantiateService(mastershipChangeListener); } @Override - public LifecycleService getLifecycleService() { - return lifecycleService; + public void setInitialSubmitHandler(final ClusterInitializationPhaseHandler initialSubmitHandler) { + this.initialSubmitHandler = initialSubmitHandler; } }