X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=openflowplugin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fimpl%2Fstatistics%2FStatisticsContextImpl.java;h=68d349ad704af44855535dca37bb63887ba55646;hb=4bd43c307a24836ab1f7d6829d93d50e868e8ded;hp=737504a8d02c9b762c6c708952a2cae2913eb689;hpb=1ef16ba3454f65ee5fc3751b3bdb30fca06ed99e;p=openflowplugin.git diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsContextImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsContextImpl.java index 737504a8d0..68d349ad70 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsContextImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsContextImpl.java @@ -8,124 +8,356 @@ package org.opendaylight.openflowplugin.impl.statistics; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterators; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; +import io.netty.util.Timeout; +import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext; import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext; +import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo; import org.opendaylight.openflowplugin.api.openflow.device.DeviceState; import org.opendaylight.openflowplugin.api.openflow.device.RequestContext; +import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleConductor; +import org.opendaylight.openflowplugin.api.openflow.rpc.listener.ItemLifecycleListener; import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext; -import org.opendaylight.openflowplugin.impl.rpc.RequestContextImpl; +import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext; +import org.opendaylight.openflowplugin.impl.rpc.listener.ItemLifecycleListenerImpl; +import org.opendaylight.openflowplugin.impl.services.RequestContextUtil; +import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringOnTheFlyService; import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringService; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesInputBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType; -import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; -import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier; -import org.opendaylight.yangtools.yang.common.RpcResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** - * Created by Martin Bobak <mbobak@cisco.com> on 1.4.2015. - */ -public class StatisticsContextImpl implements StatisticsContext { +import javax.annotation.CheckForNull; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; + +class StatisticsContextImpl implements StatisticsContext { private static final Logger LOG = LoggerFactory.getLogger(StatisticsContextImpl.class); - private final List requestContexts = new ArrayList(); + private static final String CONNECTION_CLOSED = "Connection closed."; + + private final ItemLifecycleListener itemLifeCycleListener; + private final Collection> requestContexts = new HashSet<>(); private final DeviceContext deviceContext; + private final DeviceState devState; + private final ListenableFuture emptyFuture; + private final boolean shuttingDownStatisticsPolling; + private final Object COLLECTION_STAT_TYPE_LOCK = new Object(); + @GuardedBy("COLLECTION_STAT_TYPE_LOCK") + private List collectingStatType; + private StatisticsGatheringService statisticsGatheringService; + private StatisticsGatheringOnTheFlyService statisticsGatheringOnTheFlyService; + private Timeout pollTimeout; - private final StatisticsGatheringService statisticsGatheringService; + private volatile boolean schedulingEnabled; - public StatisticsContextImpl(final DeviceContext deviceContext) { - this.deviceContext = deviceContext; + StatisticsContextImpl(@CheckForNull final DeviceInfo deviceInfo, final boolean shuttingDownStatisticsPolling, final LifecycleConductor lifecycleConductor) { + this.deviceContext = Preconditions.checkNotNull(lifecycleConductor.getDeviceContext(deviceInfo)); + this.devState = Preconditions.checkNotNull(deviceContext.getDeviceState()); + this.shuttingDownStatisticsPolling = shuttingDownStatisticsPolling; + emptyFuture = Futures.immediateFuture(false); statisticsGatheringService = new StatisticsGatheringService(this, deviceContext); - + statisticsGatheringOnTheFlyService = new StatisticsGatheringOnTheFlyService(this, deviceContext); + itemLifeCycleListener = new ItemLifecycleListenerImpl(deviceContext); + statListForCollectingInitialization(); } - private void pollFlowStatistics() { - final KeyedInstanceIdentifier nodeII = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(deviceContext.getPrimaryConnectionContext().getNodeId())); - final NodeRef nodeRef = new NodeRef(nodeII); - final GetAllFlowsStatisticsFromAllFlowTablesInputBuilder builder = - new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder(); - builder.setNode(nodeRef); - //TODO : process data from result + @Override + public void statListForCollectingInitialization() { + synchronized (COLLECTION_STAT_TYPE_LOCK) { + final List statListForCollecting = new ArrayList<>(); + if (devState.isTableStatisticsAvailable()) { + statListForCollecting.add(MultipartType.OFPMPTABLE); + } + if (devState.isFlowStatisticsAvailable()) { + statListForCollecting.add(MultipartType.OFPMPFLOW); + } + if (devState.isGroupAvailable()) { + statListForCollecting.add(MultipartType.OFPMPGROUPDESC); + statListForCollecting.add(MultipartType.OFPMPGROUP); + } + if (devState.isMetersAvailable()) { + statListForCollecting.add(MultipartType.OFPMPMETERCONFIG); + statListForCollecting.add(MultipartType.OFPMPMETER); + } + if (devState.isPortStatisticsAvailable()) { + statListForCollecting.add(MultipartType.OFPMPPORTSTATS); + } + if (devState.isQueueStatisticsAvailable()) { + statListForCollecting.add(MultipartType.OFPMPQUEUE); + } + collectingStatType = ImmutableList.copyOf(statListForCollecting); + } } @Override - public ListenableFuture gatherDynamicData() { + public ListenableFuture gatherDynamicData() { + if (shuttingDownStatisticsPolling) { + LOG.debug("Statistics for device {} is not enabled.", deviceContext.getDeviceInfo().getNodeId()); + return Futures.immediateFuture(Boolean.TRUE); + } + final ListenableFuture errorResultFuture = deviceConnectionCheck(); + if (errorResultFuture != null) { + return errorResultFuture; + } + synchronized (COLLECTION_STAT_TYPE_LOCK) { + final Iterator statIterator = collectingStatType.iterator(); + final SettableFuture settableStatResultFuture = SettableFuture.create(); - final DeviceState devState = deviceContext.getDeviceState(); + // write start timestamp to state snapshot container + StatisticsGatheringUtils.markDeviceStateSnapshotStart(deviceContext); - ListenableFuture emptyFuture = Futures.immediateFuture(null); - final ListenableFuture flowStatistics = devState.isFlowStatisticsAvailable() ? wrapLoggingOnStatisticsRequestCall(MultipartType.OFPMPFLOW) : emptyFuture; + statChainFuture(statIterator, settableStatResultFuture); - final ListenableFuture tableStatistics = devState.isTableStatisticsAvailable() ? wrapLoggingOnStatisticsRequestCall(MultipartType.OFPMPTABLE) : emptyFuture; + // write end timestamp to state snapshot container + Futures.addCallback(settableStatResultFuture, new FutureCallback() { + @Override + public void onSuccess(@Nullable final Boolean result) { + StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceContext, true); + } + @Override + public void onFailure(final Throwable t) { + StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceContext, false); + } + }); + return settableStatResultFuture; + } + } - final ListenableFuture portStatistics = devState.isPortStatisticsAvailable() ? wrapLoggingOnStatisticsRequestCall(MultipartType.OFPMPPORTSTATS) : emptyFuture; + private ListenableFuture chooseStat(final MultipartType multipartType){ + switch (multipartType) { + case OFPMPFLOW: + return collectFlowStatistics(multipartType); + case OFPMPTABLE: + return collectTableStatistics(multipartType); + case OFPMPPORTSTATS: + return collectPortStatistics(multipartType); + case OFPMPQUEUE: + return collectQueueStatistics(multipartType); + case OFPMPGROUPDESC: + return collectGroupDescStatistics(multipartType); + case OFPMPGROUP: + return collectGroupStatistics(multipartType); + case OFPMPMETERCONFIG: + return collectMeterConfigStatistics(multipartType); + case OFPMPMETER: + return collectMeterStatistics(multipartType); + default: + LOG.warn("Unsuported Statistics type {}", multipartType); + return Futures.immediateCheckedFuture(Boolean.TRUE); + } + } - final ListenableFuture queueStatistics = devState.isQueueStatisticsAvailable() ? wrapLoggingOnStatisticsRequestCall(MultipartType.OFPMPQUEUE) : emptyFuture; - final ListenableFuture groupDescStatistics = devState.isGroupAvailable() ? wrapLoggingOnStatisticsRequestCall(MultipartType.OFPMPGROUPDESC) : emptyFuture; - final ListenableFuture groupStatistics = devState.isGroupAvailable() ? wrapLoggingOnStatisticsRequestCall(MultipartType.OFPMPGROUP) : emptyFuture; + @Override + public RequestContext createRequestContext() { + final AbstractRequestContext ret = new AbstractRequestContext(deviceContext.reserveXidForDeviceMessage()) { + @Override + public void close() { + requestContexts.remove(this); + } + }; + requestContexts.add(ret); + return ret; + } - final ListenableFuture meterConfigStatistics = devState.isMetersAvailable() ? wrapLoggingOnStatisticsRequestCall(MultipartType.OFPMPMETERCONFIG) : emptyFuture; - final ListenableFuture meterStatistics = devState.isMetersAvailable() ? wrapLoggingOnStatisticsRequestCall(MultipartType.OFPMPMETER) : emptyFuture; + @Override + public void close() { + schedulingEnabled = false; + for (final Iterator> iterator = Iterators.consumingIterator(requestContexts.iterator()); + iterator.hasNext();) { + RequestContextUtil.closeRequestContextWithRpcError(iterator.next(), CONNECTION_CLOSED); + } + if (null != pollTimeout && !pollTimeout.isExpired()) { + pollTimeout.cancel(); + } + } + @Override + public void setSchedulingEnabled(final boolean schedulingEnabled) { + this.schedulingEnabled = schedulingEnabled; + } - final ListenableFuture> allFutures = Futures.allAsList(Arrays.asList(flowStatistics, tableStatistics, groupDescStatistics, groupStatistics, meterConfigStatistics, meterStatistics, portStatistics, queueStatistics)); - final SettableFuture resultingFuture = SettableFuture.create(); - Futures.addCallback(allFutures, new FutureCallback>() { + @Override + public boolean isSchedulingEnabled() { + return schedulingEnabled; + } + + @Override + public void setPollTimeout(final Timeout pollTimeout) { + this.pollTimeout = pollTimeout; + } + + @Override + public Optional getPollTimeout() { + return Optional.ofNullable(pollTimeout); + } + + private void statChainFuture(final Iterator iterator, final SettableFuture resultFuture) { + if (ConnectionContext.CONNECTION_STATE.RIP.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) { + final String errMsg = String.format("Device connection is closed for Node : %s.", + deviceContext.getDeviceInfo().getNodeId()); + LOG.debug(errMsg); + resultFuture.setException(new IllegalStateException(errMsg)); + return; + } + if ( ! iterator.hasNext()) { + resultFuture.set(Boolean.TRUE); + LOG.debug("Stats collection successfully finished for node {}", deviceContext.getDeviceInfo().getNodeId()); + return; + } + + final MultipartType nextType = iterator.next(); + LOG.debug("Stats iterating to next type for node {} of type {}", deviceContext.getDeviceInfo().getNodeId(), nextType); + + final ListenableFuture deviceStatisticsCollectionFuture = chooseStat(nextType); + Futures.addCallback(deviceStatisticsCollectionFuture, new FutureCallback() { @Override - public void onSuccess(final List booleans) { - resultingFuture.set(null); + public void onSuccess(final Boolean result) { + statChainFuture(iterator, resultFuture); } - @Override - public void onFailure(final Throwable throwable) { - resultingFuture.setException(throwable); + public void onFailure(@Nonnull final Throwable t) { + resultFuture.setException(t); } }); - return resultingFuture; } - private ListenableFuture wrapLoggingOnStatisticsRequestCall(final MultipartType type) { - final ListenableFuture future = StatisticsGatheringUtils.gatherStatistics(statisticsGatheringService, deviceContext, type); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(final Object o) { - LOG.trace("Multipart response for {} was successful.", type); + /** + * Method checks a device state. It returns null for be able continue. Otherwise it returns immediateFuture + * which has to be returned from caller too + * + * @return + */ + @VisibleForTesting + ListenableFuture deviceConnectionCheck() { + if (!ConnectionContext.CONNECTION_STATE.WORKING.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) { + ListenableFuture resultingFuture = SettableFuture.create(); + switch (deviceContext.getPrimaryConnectionContext().getConnectionState()) { + case RIP: + final String errMsg = String.format("Device connection doesn't exist anymore. Primary connection status : %s", + deviceContext.getPrimaryConnectionContext().getConnectionState()); + resultingFuture = Futures.immediateFailedFuture(new Throwable(errMsg)); + break; + default: + resultingFuture = Futures.immediateCheckedFuture(Boolean.TRUE); + break; } + return resultingFuture; + } + return null; + } - @Override - public void onFailure(final Throwable throwable) { - LOG.trace("Multipart response for {} FAILED.", type, throwable); - } - }); - return future; + //TODO: Refactor twice sending deviceContext into gatheringStatistics + private ListenableFuture collectFlowStatistics(final MultipartType multipartType) { + return devState.isFlowStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics( + statisticsGatheringOnTheFlyService, + deviceContext.getDeviceInfo(), + /*MultipartType.OFPMPFLOW*/ multipartType, + deviceContext, + deviceContext, + devState) : emptyFuture; } - @Override - public void forgetRequestContext(final RequestContext requestContext) { - requestContexts.remove(requestContexts); + private ListenableFuture collectTableStatistics(final MultipartType multipartType) { + return devState.isTableStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics( + statisticsGatheringService, + deviceContext.getDeviceInfo(), + /*MultipartType.OFPMPTABLE*/ multipartType, + deviceContext, + deviceContext, + devState) : emptyFuture; } - @Override - public SettableFuture> storeOrFail(final RequestContext data) { - requestContexts.add(data); - return data.getFuture(); + private ListenableFuture collectPortStatistics(final MultipartType multipartType) { + return devState.isPortStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics( + statisticsGatheringService, + deviceContext.getDeviceInfo(), + /*MultipartType.OFPMPPORTSTATS*/ multipartType, + deviceContext, + deviceContext, + devState) : emptyFuture; + } + + private ListenableFuture collectQueueStatistics(final MultipartType multipartType) { + return !devState.isQueueStatisticsAvailable() ? emptyFuture : StatisticsGatheringUtils.gatherStatistics( + statisticsGatheringService, + deviceContext.getDeviceInfo(), + /*MultipartType.OFPMPQUEUE*/ multipartType, + deviceContext, + deviceContext, + devState); + } + + private ListenableFuture collectGroupDescStatistics(final MultipartType multipartType) { + return devState.isGroupAvailable() ? StatisticsGatheringUtils.gatherStatistics( + statisticsGatheringService, + deviceContext.getDeviceInfo(), + /*MultipartType.OFPMPGROUPDESC*/ multipartType, + deviceContext, + deviceContext, + devState) : emptyFuture; + } + + private ListenableFuture collectGroupStatistics(final MultipartType multipartType) { + return devState.isGroupAvailable() ? StatisticsGatheringUtils.gatherStatistics( + statisticsGatheringService, + deviceContext.getDeviceInfo(), + /*MultipartType.OFPMPGROUP*/ multipartType, + deviceContext, + deviceContext, + devState) : emptyFuture; + } + + private ListenableFuture collectMeterConfigStatistics(final MultipartType multipartType) { + return devState.isMetersAvailable() ? StatisticsGatheringUtils.gatherStatistics( + statisticsGatheringService, + deviceContext.getDeviceInfo(), + /*MultipartType.OFPMPMETERCONFIG*/ multipartType, + deviceContext, + deviceContext, + devState) : emptyFuture; + } + + private ListenableFuture collectMeterStatistics(final MultipartType multipartType) { + return devState.isMetersAvailable() ? StatisticsGatheringUtils.gatherStatistics( + statisticsGatheringService, + deviceContext.getDeviceInfo(), + /*MultipartType.OFPMPMETER*/ multipartType, + deviceContext, + deviceContext, + devState) : emptyFuture; + } + + @VisibleForTesting + void setStatisticsGatheringService(final StatisticsGatheringService statisticsGatheringService) { + this.statisticsGatheringService = statisticsGatheringService; + } + + @VisibleForTesting + void setStatisticsGatheringOnTheFlyService(final StatisticsGatheringOnTheFlyService + statisticsGatheringOnTheFlyService) { + this.statisticsGatheringOnTheFlyService = statisticsGatheringOnTheFlyService; } @Override - public RequestContext createRequestContext() { - return new RequestContextImpl<>(this); + public ItemLifecycleListener getItemLifeCycleListener () { + return itemLifeCycleListener; } + }