X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fimpl%2Fstatistics%2FStatisticsContextImpl.java;h=301d0061fbd97c226ea5ce413489da571eca6589;hb=233568bf5702295c73438fe9e67a2c011c539641;hp=6f4f2327c2e696170eee880a14d12267205b331f;hpb=565cdfaa2958d520a1c5e139b7789333bc077ea4;p=openflowplugin.git diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsContextImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsContextImpl.java index 6f4f2327c2..301d0061fb 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsContextImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsContextImpl.java @@ -8,30 +8,33 @@ package org.opendaylight.openflowplugin.impl.statistics; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; +import io.netty.util.Timeout; import java.util.ArrayList; -import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; import java.util.List; +import javax.annotation.CheckForNull; import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext; import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext; import org.opendaylight.openflowplugin.api.openflow.device.DeviceState; import org.opendaylight.openflowplugin.api.openflow.device.RequestContext; +import org.opendaylight.openflowplugin.api.openflow.rpc.listener.ItemLifecycleListener; import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext; -import org.opendaylight.openflowplugin.impl.rpc.RequestContextImpl; +import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext; +import org.opendaylight.openflowplugin.impl.rpc.listener.ItemLifecycleListenerImpl; import org.opendaylight.openflowplugin.impl.services.RequestContextUtil; +import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringOnTheFlyService; import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringService; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesInputBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType; -import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; -import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier; -import org.opendaylight.yangtools.yang.common.RpcResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,112 +44,215 @@ import org.slf4j.LoggerFactory; public class StatisticsContextImpl implements StatisticsContext { private static final Logger LOG = LoggerFactory.getLogger(StatisticsContextImpl.class); - public static final String CONNECTION_CLOSED = "Connection closed."; - private final List requestContexts = new ArrayList(); - private final DeviceContext deviceContext; + private static final String CONNECTION_CLOSED = "Connection closed."; + private final ItemLifecycleListener itemLifeCycleListener; + private final Collection> requestContexts = new HashSet<>(); + private final DeviceContext deviceContext; + private final DeviceState devState; + private final ListenableFuture emptyFuture; + private final List collectingStatType; - private final StatisticsGatheringService statisticsGatheringService; + private StatisticsGatheringService statisticsGatheringService; + private StatisticsGatheringOnTheFlyService statisticsGatheringOnTheFlyService; + private Timeout pollTimeout; - public StatisticsContextImpl(final DeviceContext deviceContext) { - this.deviceContext = deviceContext; + public StatisticsContextImpl(@CheckForNull final DeviceContext deviceContext) { + this.deviceContext = Preconditions.checkNotNull(deviceContext); + devState = Preconditions.checkNotNull(deviceContext.getDeviceState()); + emptyFuture = Futures.immediateFuture(new Boolean(false)); statisticsGatheringService = new StatisticsGatheringService(this, deviceContext); + statisticsGatheringOnTheFlyService = new StatisticsGatheringOnTheFlyService(this, deviceContext); - } - - private void pollFlowStatistics() { - final KeyedInstanceIdentifier nodeII = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(deviceContext.getPrimaryConnectionContext().getNodeId())); - final NodeRef nodeRef = new NodeRef(nodeII); - final GetAllFlowsStatisticsFromAllFlowTablesInputBuilder builder = - new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder(); - builder.setNode(nodeRef); - //TODO : process data from result + final List statListForCollecting = new ArrayList<>(); + if (devState.isTableStatisticsAvailable()) { + statListForCollecting.add(MultipartType.OFPMPTABLE); + } + if (devState.isFlowStatisticsAvailable()) { + statListForCollecting.add(MultipartType.OFPMPFLOW); + } + if (devState.isGroupAvailable()) { + statListForCollecting.add(MultipartType.OFPMPGROUPDESC); + statListForCollecting.add(MultipartType.OFPMPGROUP); + } + if (devState.isMetersAvailable()) { + statListForCollecting.add(MultipartType.OFPMPMETERCONFIG); + statListForCollecting.add(MultipartType.OFPMPMETER); + } + if (devState.isPortStatisticsAvailable()) { + statListForCollecting.add(MultipartType.OFPMPPORTSTATS); + } + if (devState.isQueueStatisticsAvailable()) { + statListForCollecting.add(MultipartType.OFPMPQUEUE); + } + collectingStatType = ImmutableList.copyOf(statListForCollecting); + itemLifeCycleListener = new ItemLifecycleListenerImpl(deviceContext); } @Override public ListenableFuture gatherDynamicData() { + final ListenableFuture errorResultFuture = deviceConnectionCheck(); + if (errorResultFuture != null) { + return errorResultFuture; + } + final Iterator statIterator = collectingStatType.iterator(); + final SettableFuture settableStatResultFuture = SettableFuture.create(); + statChainFuture(statIterator, settableStatResultFuture); + return settableStatResultFuture; + } + + private ListenableFuture chooseStat(final MultipartType multipartType) { + switch (multipartType) { + case OFPMPFLOW: + return collectFlowStatistics(multipartType); + case OFPMPTABLE: + return collectTableStatistics(multipartType); + case OFPMPPORTSTATS: + return collectPortStatistics(multipartType); + case OFPMPQUEUE: + return collectQueueStatistics(multipartType); + case OFPMPGROUPDESC: + return collectGroupDescStatistics(multipartType); + case OFPMPGROUP: + return collectGroupStatistics(multipartType); + case OFPMPMETERCONFIG: + return collectMeterConfigStatistics(multipartType); + case OFPMPMETER: + return collectMeterStatistics(multipartType); + default: + LOG.warn("Unsuported Statistics type {}", multipartType); + return Futures.immediateCheckedFuture(Boolean.TRUE); + } + } + @Override + public RequestContext createRequestContext() { + final AbstractRequestContext ret = new AbstractRequestContext(deviceContext.getReservedXid()) { + @Override + public void close() { + requestContexts.remove(this); + } + }; + requestContexts.add(ret); + return ret; + } - final ListenableFuture resultingFuture; + @Override + public void close() { + for (final RequestContext requestContext : requestContexts) { + RequestContextUtil.closeRequestContextWithRpcError(requestContext, CONNECTION_CLOSED); + } + if (null != pollTimeout && !pollTimeout.isExpired()) { + pollTimeout.cancel(); + } + } - if (ConnectionContext.CONNECTION_STATE.WORKING.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) { - final DeviceState devState = deviceContext.getDeviceState(); - final SettableFuture settableResultingFuture = SettableFuture.create(); - resultingFuture = settableResultingFuture; - ListenableFuture emptyFuture = Futures.immediateFuture(new Boolean(false)); - final ListenableFuture flowStatistics = devState.isFlowStatisticsAvailable() ? wrapLoggingOnStatisticsRequestCall(MultipartType.OFPMPFLOW) : emptyFuture; + @Override + public void setPollTimeout(Timeout pollTimeout) { + this.pollTimeout = pollTimeout; + } - final ListenableFuture tableStatistics = devState.isTableStatisticsAvailable() ? wrapLoggingOnStatisticsRequestCall(MultipartType.OFPMPTABLE) : emptyFuture; + @Override + public Optional getPollTimeout() { + return Optional.fromNullable(pollTimeout); + } - final ListenableFuture portStatistics = devState.isPortStatisticsAvailable() ? wrapLoggingOnStatisticsRequestCall(MultipartType.OFPMPPORTSTATS) : emptyFuture; + void statChainFuture(final Iterator iterator, final SettableFuture resultFuture) { + if ( ! iterator.hasNext()) { + resultFuture.set(Boolean.TRUE); + return; + } + final ListenableFuture deviceStatisticsCollectionFuture = chooseStat(iterator.next()); + Futures.addCallback(deviceStatisticsCollectionFuture, new FutureCallback() { + @Override + public void onSuccess(final Boolean result) { + statChainFuture(iterator, resultFuture); + } + @Override + public void onFailure(final Throwable t) { + resultFuture.setException(t); + } + }); + } - final ListenableFuture queueStatistics = devState.isQueueStatisticsAvailable() ? wrapLoggingOnStatisticsRequestCall(MultipartType.OFPMPQUEUE) : emptyFuture; + /** + * Method checks a device state. It returns null for be able continue. Otherwise it returns immediateFuture + * which has to be returned from caller too + * + * @return + */ + @VisibleForTesting + ListenableFuture deviceConnectionCheck() { + if (!ConnectionContext.CONNECTION_STATE.WORKING.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) { + ListenableFuture resultingFuture = SettableFuture.create(); + switch (deviceContext.getPrimaryConnectionContext().getConnectionState()) { + case RIP: + final String errMsg = String.format("Device connection doesn't exist anymore. Primary connection status : %s", + deviceContext.getPrimaryConnectionContext().getConnectionState()); + resultingFuture = Futures.immediateFailedFuture(new Throwable(errMsg)); + break; + default: + resultingFuture = Futures.immediateCheckedFuture(Boolean.TRUE); + break; + } + return resultingFuture; + } + return null; + } - final ListenableFuture groupDescStatistics = devState.isGroupAvailable() ? wrapLoggingOnStatisticsRequestCall(MultipartType.OFPMPGROUPDESC) : emptyFuture; - final ListenableFuture groupStatistics = devState.isGroupAvailable() ? wrapLoggingOnStatisticsRequestCall(MultipartType.OFPMPGROUP) : emptyFuture; + private ListenableFuture collectFlowStatistics(final MultipartType multipartType) { + return devState.isFlowStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics( + statisticsGatheringOnTheFlyService, deviceContext, /*MultipartType.OFPMPFLOW*/ multipartType) : emptyFuture; + } - final ListenableFuture meterConfigStatistics = devState.isMetersAvailable() ? wrapLoggingOnStatisticsRequestCall(MultipartType.OFPMPMETERCONFIG) : emptyFuture; - final ListenableFuture meterStatistics = devState.isMetersAvailable() ? wrapLoggingOnStatisticsRequestCall(MultipartType.OFPMPMETER) : emptyFuture; + private ListenableFuture collectTableStatistics(final MultipartType multipartType) { + return devState.isTableStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics( + statisticsGatheringService, deviceContext, /*MultipartType.OFPMPTABLE*/ multipartType) : emptyFuture; + } + private ListenableFuture collectPortStatistics(final MultipartType multipartType) { + return devState.isPortStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics( + statisticsGatheringService, deviceContext, /*MultipartType.OFPMPPORTSTATS*/ multipartType) : emptyFuture; + } - final ListenableFuture> allFutures = Futures.allAsList(Arrays.asList(flowStatistics, tableStatistics, groupDescStatistics, groupStatistics, meterConfigStatistics, meterStatistics, portStatistics, queueStatistics)); - Futures.addCallback(allFutures, new FutureCallback>() { - @Override - public void onSuccess(final List booleans) { - boolean atLeastOneSuccess = false; - for (Boolean bool : booleans) { - atLeastOneSuccess |= bool.booleanValue(); - } - settableResultingFuture.set(new Boolean(atLeastOneSuccess)); - } + private ListenableFuture collectQueueStatistics(final MultipartType multipartType) { + return devState.isQueueStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics( + statisticsGatheringService, deviceContext, /*MultipartType.OFPMPQUEUE*/ multipartType) : emptyFuture; + } - @Override - public void onFailure(final Throwable throwable) { - settableResultingFuture.setException(throwable); - } - }); - } else { - resultingFuture = Futures.immediateFailedFuture(new Throwable(String.format("Device connection doesn't exist anymore. Primary connection status : %s", deviceContext.getPrimaryConnectionContext().getConnectionState()))); - } - return resultingFuture; + private ListenableFuture collectGroupDescStatistics(final MultipartType multipartType) { + return devState.isGroupAvailable() ? StatisticsGatheringUtils.gatherStatistics( + statisticsGatheringService, deviceContext, /*MultipartType.OFPMPGROUPDESC*/ multipartType) : emptyFuture; } - private ListenableFuture wrapLoggingOnStatisticsRequestCall(final MultipartType type) { - final ListenableFuture future = StatisticsGatheringUtils.gatherStatistics(statisticsGatheringService, deviceContext, type); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(final Object o) { - LOG.trace("Multipart response for {} was successful.", type); - } + private ListenableFuture collectGroupStatistics(final MultipartType multipartType) { + return devState.isGroupAvailable() ? StatisticsGatheringUtils.gatherStatistics( + statisticsGatheringService, deviceContext, /*MultipartType.OFPMPGROUP*/ multipartType) : emptyFuture; + } - @Override - public void onFailure(final Throwable throwable) { - LOG.trace("Multipart response for {} FAILED.", type, throwable); - } - }); - return future; + private ListenableFuture collectMeterConfigStatistics(final MultipartType multipartType) { + return devState.isMetersAvailable() ? StatisticsGatheringUtils.gatherStatistics( + statisticsGatheringService, deviceContext, /*MultipartType.OFPMPMETERCONFIG*/ multipartType) : emptyFuture; } - @Override - public void forgetRequestContext(final RequestContext requestContext) { - requestContexts.remove(requestContexts); + private ListenableFuture collectMeterStatistics(final MultipartType multipartType) { + return devState.isMetersAvailable() ? StatisticsGatheringUtils.gatherStatistics( + statisticsGatheringService, deviceContext, /*MultipartType.OFPMPMETER*/ multipartType) : emptyFuture; } - @Override - public SettableFuture> storeOrFail(final RequestContext data) { - requestContexts.add(data); - return data.getFuture(); + @VisibleForTesting + protected void setStatisticsGatheringService(StatisticsGatheringService statisticsGatheringService) { + this.statisticsGatheringService = statisticsGatheringService; } - @Override - public RequestContext createRequestContext() { - return new RequestContextImpl<>(this); + @VisibleForTesting + protected void setStatisticsGatheringOnTheFlyService(StatisticsGatheringOnTheFlyService + statisticsGatheringOnTheFlyService) { + this.statisticsGatheringOnTheFlyService = statisticsGatheringOnTheFlyService; } @Override - public void close() throws Exception { - for (RequestContext requestContext : requestContexts) { - RequestContextUtil.closeRequestContextWithRpcError(requestContext, CONNECTION_CLOSED); - } + public ItemLifecycleListener getItemLifeCycleListener() { + return itemLifeCycleListener; } }