X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fimpl%2Fstatistics%2FStatisticsContextImpl.java;h=3002b8e632d83e53bb23b5d0f6f6376c92605cf0;hb=d1af0fd5a4053a10917f631bae42970c1960fd20;hp=8fe6bb4b52edaa197a903d0ae4586addbb745671;hpb=7fa33fd58abd5d38788581d9207e972dfd3eb592;p=openflowplugin.git diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsContextImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsContextImpl.java index 8fe6bb4b52..3002b8e632 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsContextImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsContextImpl.java @@ -9,6 +9,7 @@ package org.opendaylight.openflowplugin.impl.statistics; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterators; @@ -22,24 +23,30 @@ import java.util.Collection; import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Objects; import java.util.Optional; -import javax.annotation.CheckForNull; import javax.annotation.Nonnull; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; +import org.opendaylight.mdsal.common.api.TransactionChainClosedException; +import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier; +import org.opendaylight.openflowplugin.api.ConnectionException; import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext; import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext; import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo; import org.opendaylight.openflowplugin.api.openflow.device.DeviceState; import org.opendaylight.openflowplugin.api.openflow.device.RequestContext; -import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleConductor; +import org.opendaylight.openflowplugin.api.openflow.device.handlers.ClusterInitializationPhaseHandler; +import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService; import org.opendaylight.openflowplugin.api.openflow.rpc.listener.ItemLifecycleListener; import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext; +import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager; import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext; import org.opendaylight.openflowplugin.impl.rpc.listener.ItemLifecycleListenerImpl; import org.opendaylight.openflowplugin.impl.services.RequestContextUtil; import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringOnTheFlyService; import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringService; +import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,33 +61,50 @@ class StatisticsContextImpl implements StatisticsContext { private final DeviceContext deviceContext; private final DeviceState devState; private final ListenableFuture emptyFuture; - private final boolean shuttingDownStatisticsPolling; - private final Object COLLECTION_STAT_TYPE_LOCK = new Object(); - @GuardedBy("COLLECTION_STAT_TYPE_LOCK") + private final boolean isStatisticsPollingOn; + private final SinglePurposeMultipartReplyTranslator multipartReplyTranslator; + private final Object collectionStatTypeLock = new Object(); + @GuardedBy("collectionStatTypeLock") private List collectingStatType; private StatisticsGatheringService statisticsGatheringService; private StatisticsGatheringOnTheFlyService statisticsGatheringOnTheFlyService; private Timeout pollTimeout; + private final DeviceInfo deviceInfo; + private final StatisticsManager myManager; + private final LifecycleService lifecycleService; private volatile boolean schedulingEnabled; - private volatile CONTEXT_STATE contextState; - - StatisticsContextImpl(@CheckForNull final DeviceInfo deviceInfo, final boolean shuttingDownStatisticsPolling, final LifecycleConductor lifecycleConductor) { - this.deviceContext = Preconditions.checkNotNull(lifecycleConductor.getDeviceContext(deviceInfo)); + private volatile CONTEXT_STATE state; + private ClusterInitializationPhaseHandler clusterInitializationPhaseHandler; + private ClusterInitializationPhaseHandler initialSubmitHandler; + + private ListenableFuture lastDataGathering; + + StatisticsContextImpl(@Nonnull final DeviceInfo deviceInfo, + final boolean isStatisticsPollingOn, + @Nonnull final LifecycleService lifecycleService, + @Nonnull final ConvertorExecutor convertorExecutor, + @Nonnull final StatisticsManager myManager) { + this.lifecycleService = lifecycleService; + this.deviceContext = lifecycleService.getDeviceContext(); this.devState = Preconditions.checkNotNull(deviceContext.getDeviceState()); - this.shuttingDownStatisticsPolling = shuttingDownStatisticsPolling; + this.isStatisticsPollingOn = isStatisticsPollingOn; + multipartReplyTranslator = new SinglePurposeMultipartReplyTranslator(convertorExecutor); emptyFuture = Futures.immediateFuture(false); statisticsGatheringService = new StatisticsGatheringService(this, deviceContext); - statisticsGatheringOnTheFlyService = new StatisticsGatheringOnTheFlyService(this, deviceContext); + statisticsGatheringOnTheFlyService = new StatisticsGatheringOnTheFlyService(this, deviceContext, convertorExecutor); itemLifeCycleListener = new ItemLifecycleListenerImpl(deviceContext); statListForCollectingInitialization(); - contextState = CONTEXT_STATE.WORKING; + this.state = CONTEXT_STATE.INITIALIZATION; + this.deviceInfo = deviceInfo; + this.myManager = myManager; + this.lastDataGathering = null; } @Override public void statListForCollectingInitialization() { - synchronized (COLLECTION_STAT_TYPE_LOCK) { + synchronized (collectionStatTypeLock) { final List statListForCollecting = new ArrayList<>(); if (devState.isTableStatisticsAvailable()) { statListForCollecting.add(MultipartType.OFPMPTABLE); @@ -118,15 +142,16 @@ class StatisticsContextImpl implements StatisticsContext { } private ListenableFuture gatherDynamicData(final boolean initial) { - if (shuttingDownStatisticsPolling) { - LOG.debug("Statistics for device {} is not enabled.", deviceContext.getDeviceInfo().getNodeId()); + this.lastDataGathering = null; + if (!isStatisticsPollingOn) { + LOG.debug("Statistics for device {} is not enabled.", getDeviceInfo().getNodeId().getValue()); return Futures.immediateFuture(Boolean.TRUE); } final ListenableFuture errorResultFuture = deviceConnectionCheck(); if (errorResultFuture != null) { return errorResultFuture; } - synchronized (COLLECTION_STAT_TYPE_LOCK) { + synchronized (collectionStatTypeLock) { final Iterator statIterator = collectingStatType.iterator(); final SettableFuture settableStatResultFuture = SettableFuture.create(); @@ -143,41 +168,55 @@ class StatisticsContextImpl implements StatisticsContext { } @Override public void onFailure(final Throwable t) { - StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceContext, false); + if (!(t instanceof TransactionChainClosedException)) { + StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceContext, false); + } } }); + this.lastDataGathering = settableStatResultFuture; return settableStatResultFuture; } } private ListenableFuture chooseStat(final MultipartType multipartType, final boolean initial){ + ListenableFuture result = Futures.immediateCheckedFuture(Boolean.TRUE); + switch (multipartType) { case OFPMPFLOW: - return collectFlowStatistics(multipartType, initial); + result = collectFlowStatistics(multipartType, initial); + break; case OFPMPTABLE: - return collectTableStatistics(multipartType); + result = collectTableStatistics(multipartType); + break; case OFPMPPORTSTATS: - return collectPortStatistics(multipartType); + result = collectPortStatistics(multipartType); + break; case OFPMPQUEUE: - return collectQueueStatistics(multipartType); + result = collectQueueStatistics(multipartType); + break; case OFPMPGROUPDESC: - return collectGroupDescStatistics(multipartType); + result = collectGroupDescStatistics(multipartType); + break; case OFPMPGROUP: - return collectGroupStatistics(multipartType); + result = collectGroupStatistics(multipartType); + break; case OFPMPMETERCONFIG: - return collectMeterConfigStatistics(multipartType); + result = collectMeterConfigStatistics(multipartType); + break; case OFPMPMETER: - return collectMeterStatistics(multipartType); + result = collectMeterStatistics(multipartType); + break; default: - LOG.warn("Unsuported Statistics type {}", multipartType); - return Futures.immediateCheckedFuture(Boolean.TRUE); + LOG.warn("Unsupported Statistics type {}", multipartType); } + + return result; } @Override public RequestContext createRequestContext() { - final AbstractRequestContext ret = new AbstractRequestContext(deviceContext.reserveXidForDeviceMessage()) { + final AbstractRequestContext ret = new AbstractRequestContext(deviceInfo.reserveXidForDeviceMessage()) { @Override public void close() { requestContexts.remove(this); @@ -189,17 +228,24 @@ class StatisticsContextImpl implements StatisticsContext { @Override public void close() { - if (CONTEXT_STATE.TERMINATION.equals(contextState)) { + if (CONTEXT_STATE.TERMINATION.equals(getState())) { if (LOG.isDebugEnabled()) { - LOG.debug("Statistics context is already in state TERMINATION."); + LOG.debug("StatisticsContext for node {} is already in TERMINATION state.", getDeviceInfo().getLOGValue()); } } else { - contextState = CONTEXT_STATE.TERMINATION; - schedulingEnabled = false; + try { + stopClusterServices().get(); + } catch (Exception e) { + LOG.debug("Failed to close StatisticsContext for node {} with exception: ", getDeviceInfo().getLOGValue(), e); + } + + this.state = CONTEXT_STATE.TERMINATION; + for (final Iterator> iterator = Iterators.consumingIterator(requestContexts.iterator()); iterator.hasNext(); ) { RequestContextUtil.closeRequestContextWithRpcError(iterator.next(), CONNECTION_CLOSED); } + if (null != pollTimeout && !pollTimeout.isExpired()) { pollTimeout.cancel(); } @@ -229,19 +275,19 @@ class StatisticsContextImpl implements StatisticsContext { private void statChainFuture(final Iterator iterator, final SettableFuture resultFuture, final boolean initial) { if (ConnectionContext.CONNECTION_STATE.RIP.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) { final String errMsg = String.format("Device connection is closed for Node : %s.", - deviceContext.getDeviceInfo().getNodeId()); + getDeviceInfo().getNodeId()); LOG.debug(errMsg); - resultFuture.setException(new IllegalStateException(errMsg)); + resultFuture.setException(new ConnectionException(errMsg)); return; } if ( ! iterator.hasNext()) { resultFuture.set(Boolean.TRUE); - LOG.debug("Stats collection successfully finished for node {}", deviceContext.getDeviceInfo().getNodeId()); + LOG.debug("Stats collection successfully finished for node {}", getDeviceInfo().getLOGValue()); return; } final MultipartType nextType = iterator.next(); - LOG.debug("Stats iterating to next type for node {} of type {}", deviceContext.getDeviceInfo().getNodeId(), nextType); + LOG.debug("Stats iterating to next type for node {} of type {}", getDeviceInfo().getLOGValue(), nextType); final ListenableFuture deviceStatisticsCollectionFuture = chooseStat(nextType, initial); Futures.addCallback(deviceStatisticsCollectionFuture, new FutureCallback() { @@ -285,81 +331,81 @@ class StatisticsContextImpl implements StatisticsContext { private ListenableFuture collectFlowStatistics(final MultipartType multipartType, final boolean initial) { return devState.isFlowStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics( statisticsGatheringOnTheFlyService, - deviceContext.getDeviceInfo(), + getDeviceInfo(), /*MultipartType.OFPMPFLOW*/ multipartType, deviceContext, deviceContext, - initial) : emptyFuture; + initial, multipartReplyTranslator) : emptyFuture; } private ListenableFuture collectTableStatistics(final MultipartType multipartType) { return devState.isTableStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics( statisticsGatheringService, - deviceContext.getDeviceInfo(), + getDeviceInfo(), /*MultipartType.OFPMPTABLE*/ multipartType, deviceContext, deviceContext, - false) : emptyFuture; + false, multipartReplyTranslator) : emptyFuture; } private ListenableFuture collectPortStatistics(final MultipartType multipartType) { return devState.isPortStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics( statisticsGatheringService, - deviceContext.getDeviceInfo(), + getDeviceInfo(), /*MultipartType.OFPMPPORTSTATS*/ multipartType, deviceContext, deviceContext, - false) : emptyFuture; + false, multipartReplyTranslator) : emptyFuture; } private ListenableFuture collectQueueStatistics(final MultipartType multipartType) { return !devState.isQueueStatisticsAvailable() ? emptyFuture : StatisticsGatheringUtils.gatherStatistics( statisticsGatheringService, - deviceContext.getDeviceInfo(), + getDeviceInfo(), /*MultipartType.OFPMPQUEUE*/ multipartType, deviceContext, deviceContext, - false); + false, multipartReplyTranslator); } private ListenableFuture collectGroupDescStatistics(final MultipartType multipartType) { return devState.isGroupAvailable() ? StatisticsGatheringUtils.gatherStatistics( statisticsGatheringService, - deviceContext.getDeviceInfo(), + getDeviceInfo(), /*MultipartType.OFPMPGROUPDESC*/ multipartType, deviceContext, deviceContext, - false) : emptyFuture; + false, multipartReplyTranslator) : emptyFuture; } private ListenableFuture collectGroupStatistics(final MultipartType multipartType) { return devState.isGroupAvailable() ? StatisticsGatheringUtils.gatherStatistics( statisticsGatheringService, - deviceContext.getDeviceInfo(), + getDeviceInfo(), /*MultipartType.OFPMPGROUP*/ multipartType, deviceContext, deviceContext, - false) : emptyFuture; + false, multipartReplyTranslator) : emptyFuture; } private ListenableFuture collectMeterConfigStatistics(final MultipartType multipartType) { return devState.isMetersAvailable() ? StatisticsGatheringUtils.gatherStatistics( statisticsGatheringService, - deviceContext.getDeviceInfo(), + getDeviceInfo(), /*MultipartType.OFPMPMETERCONFIG*/ multipartType, deviceContext, deviceContext, - false) : emptyFuture; + false, multipartReplyTranslator) : emptyFuture; } private ListenableFuture collectMeterStatistics(final MultipartType multipartType) { return devState.isMetersAvailable() ? StatisticsGatheringUtils.gatherStatistics( statisticsGatheringService, - deviceContext.getDeviceInfo(), + getDeviceInfo(), /*MultipartType.OFPMPMETER*/ multipartType, deviceContext, deviceContext, - false) : emptyFuture; + false, multipartReplyTranslator) : emptyFuture; } @VisibleForTesting @@ -380,6 +426,95 @@ class StatisticsContextImpl implements StatisticsContext { @Override public CONTEXT_STATE getState() { - return contextState; + return this.state; + } + + @Override + public ServiceGroupIdentifier getServiceIdentifier() { + return this.deviceInfo.getServiceIdentifier(); + } + + @Override + public DeviceInfo getDeviceInfo() { + return this.deviceInfo; + } + + @Override + public ListenableFuture stopClusterServices() { + if (CONTEXT_STATE.TERMINATION.equals(getState())) { + return Futures.immediateCancelledFuture(); + } + + return Futures.transform(Futures.immediateFuture(null), new Function() { + @Nullable + @Override + public Void apply(@Nullable Object input) { + schedulingEnabled = false; + stopGatheringData(); + return null; + } + }); + } + + @Override + public DeviceState gainDeviceState() { + return gainDeviceContext().getDeviceState(); + } + + @Override + public DeviceContext gainDeviceContext() { + return this.lifecycleService.getDeviceContext(); + } + + @Override + public void stopGatheringData() { + if (Objects.nonNull(this.lastDataGathering)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Stop the running statistics gathering for node {}", this.deviceInfo.getLOGValue()); + } + + lastDataGathering.cancel(true); + } + } + + @Override + public void setLifecycleInitializationPhaseHandler(final ClusterInitializationPhaseHandler handler) { + this.clusterInitializationPhaseHandler = handler; + } + + @Override + public boolean onContextInstantiateService(final ConnectionContext connectionContext) { + if (connectionContext.getConnectionState().equals(ConnectionContext.CONNECTION_STATE.RIP)) { + LOG.warn("Connection on device {} was interrupted, will stop starting master services.", deviceInfo.getLOGValue()); + return false; + } + + LOG.info("Starting statistics context cluster services for node {}", deviceInfo.getLOGValue()); + + this.statListForCollectingInitialization(); + Futures.addCallback(this.initialGatherDynamicData(), new FutureCallback() { + + @Override + public void onSuccess(@Nullable Boolean aBoolean) { + initialSubmitHandler.initialSubmitTransaction(); + } + + @Override + public void onFailure(Throwable throwable) { + LOG.warn("Initial gathering statistics unsuccessful for node {}", deviceInfo.getLOGValue()); + lifecycleService.closeConnection(); + } + }); + + if (this.isStatisticsPollingOn) { + myManager.startScheduling(deviceInfo); + } + + return this.clusterInitializationPhaseHandler.onContextInstantiateService(connectionContext); + } + + @Override + public void setInitialSubmitHandler(final ClusterInitializationPhaseHandler initialSubmitHandler) { + this.initialSubmitHandler = initialSubmitHandler; } -} +} \ No newline at end of file