X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fimpl%2Fstatistics%2FStatisticsContextImpl.java;h=9c8755e83de2ff7f07644bc8130366d5e809a362;hb=2578bbbc093ee26fa014ed2c115f6be76af096e2;hp=301d0061fbd97c226ea5ce413489da571eca6589;hpb=2dc098efe7e6359fe61d2b98472967bfd03f4c86;p=openflowplugin.git diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsContextImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsContextImpl.java index 301d0061fb..9c8755e83d 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsContextImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsContextImpl.java @@ -9,9 +9,9 @@ package org.opendaylight.openflowplugin.impl.statistics; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterators; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -22,26 +22,33 @@ import java.util.Collection; import java.util.HashSet; import java.util.Iterator; import java.util.List; -import javax.annotation.CheckForNull; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; +import org.opendaylight.mdsal.common.api.TransactionChainClosedException; +import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier; import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext; import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext; +import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo; import org.opendaylight.openflowplugin.api.openflow.device.DeviceState; import org.opendaylight.openflowplugin.api.openflow.device.RequestContext; +import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService; import org.opendaylight.openflowplugin.api.openflow.rpc.listener.ItemLifecycleListener; import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext; +import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager; import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext; import org.opendaylight.openflowplugin.impl.rpc.listener.ItemLifecycleListenerImpl; import org.opendaylight.openflowplugin.impl.services.RequestContextUtil; import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringOnTheFlyService; import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringService; +import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** - * Created by Martin Bobak <mbobak@cisco.com> on 1.4.2015. - */ -public class StatisticsContextImpl implements StatisticsContext { +class StatisticsContextImpl implements StatisticsContext { private static final Logger LOG = LoggerFactory.getLogger(StatisticsContextImpl.class); private static final String CONNECTION_CLOSED = "Connection closed."; @@ -51,60 +58,120 @@ public class StatisticsContextImpl implements StatisticsContext { private final DeviceContext deviceContext; private final DeviceState devState; private final ListenableFuture emptyFuture; - private final List collectingStatType; + private final boolean shuttingDownStatisticsPolling; + private final Object COLLECTION_STAT_TYPE_LOCK = new Object(); + private final SinglePurposeMultipartReplyTranslator multipartReplyTranslator; + @GuardedBy("COLLECTION_STAT_TYPE_LOCK") + private List collectingStatType; private StatisticsGatheringService statisticsGatheringService; private StatisticsGatheringOnTheFlyService statisticsGatheringOnTheFlyService; private Timeout pollTimeout; + private final DeviceInfo deviceInfo; + private final StatisticsManager myManager; + private final LifecycleService lifecycleService; + + private volatile boolean schedulingEnabled; + private volatile CONTEXT_STATE state; - public StatisticsContextImpl(@CheckForNull final DeviceContext deviceContext) { - this.deviceContext = Preconditions.checkNotNull(deviceContext); - devState = Preconditions.checkNotNull(deviceContext.getDeviceState()); - emptyFuture = Futures.immediateFuture(new Boolean(false)); + StatisticsContextImpl(@Nonnull final DeviceInfo deviceInfo, + final boolean shuttingDownStatisticsPolling, + @Nonnull final LifecycleService lifecycleService, + @Nonnull final ConvertorExecutor convertorExecutor, + @Nonnull final StatisticsManager myManager) { + this.lifecycleService = lifecycleService; + this.deviceContext = lifecycleService.getDeviceContext(); + this.devState = Preconditions.checkNotNull(deviceContext.getDeviceState()); + this.shuttingDownStatisticsPolling = shuttingDownStatisticsPolling; + multipartReplyTranslator = new SinglePurposeMultipartReplyTranslator(convertorExecutor); + emptyFuture = Futures.immediateFuture(false); statisticsGatheringService = new StatisticsGatheringService(this, deviceContext); - statisticsGatheringOnTheFlyService = new StatisticsGatheringOnTheFlyService(this, deviceContext); + statisticsGatheringOnTheFlyService = new StatisticsGatheringOnTheFlyService(this, deviceContext, convertorExecutor); + itemLifeCycleListener = new ItemLifecycleListenerImpl(deviceContext); + statListForCollectingInitialization(); + setState(CONTEXT_STATE.INITIALIZATION); + this.deviceInfo = deviceInfo; + this.myManager = myManager; + } - final List statListForCollecting = new ArrayList<>(); - if (devState.isTableStatisticsAvailable()) { - statListForCollecting.add(MultipartType.OFPMPTABLE); - } - if (devState.isFlowStatisticsAvailable()) { - statListForCollecting.add(MultipartType.OFPMPFLOW); - } - if (devState.isGroupAvailable()) { - statListForCollecting.add(MultipartType.OFPMPGROUPDESC); - statListForCollecting.add(MultipartType.OFPMPGROUP); - } - if (devState.isMetersAvailable()) { - statListForCollecting.add(MultipartType.OFPMPMETERCONFIG); - statListForCollecting.add(MultipartType.OFPMPMETER); - } - if (devState.isPortStatisticsAvailable()) { - statListForCollecting.add(MultipartType.OFPMPPORTSTATS); - } - if (devState.isQueueStatisticsAvailable()) { - statListForCollecting.add(MultipartType.OFPMPQUEUE); + @Override + public void statListForCollectingInitialization() { + synchronized (COLLECTION_STAT_TYPE_LOCK) { + final List statListForCollecting = new ArrayList<>(); + if (devState.isTableStatisticsAvailable()) { + statListForCollecting.add(MultipartType.OFPMPTABLE); + } + if (devState.isFlowStatisticsAvailable()) { + statListForCollecting.add(MultipartType.OFPMPFLOW); + } + if (devState.isGroupAvailable()) { + statListForCollecting.add(MultipartType.OFPMPGROUPDESC); + statListForCollecting.add(MultipartType.OFPMPGROUP); + } + if (devState.isMetersAvailable()) { + statListForCollecting.add(MultipartType.OFPMPMETERCONFIG); + statListForCollecting.add(MultipartType.OFPMPMETER); + } + if (devState.isPortStatisticsAvailable()) { + statListForCollecting.add(MultipartType.OFPMPPORTSTATS); + } + if (devState.isQueueStatisticsAvailable()) { + statListForCollecting.add(MultipartType.OFPMPQUEUE); + } + collectingStatType = ImmutableList.copyOf(statListForCollecting); } - collectingStatType = ImmutableList.copyOf(statListForCollecting); - itemLifeCycleListener = new ItemLifecycleListenerImpl(deviceContext); + } + + + @Override + public ListenableFuture initialGatherDynamicData() { + return gatherDynamicData(true); } @Override - public ListenableFuture gatherDynamicData() { + public ListenableFuture gatherDynamicData(){ + return gatherDynamicData(false); + } + + private ListenableFuture gatherDynamicData(final boolean initial) { + if (shuttingDownStatisticsPolling) { + LOG.debug("Statistics for device {} is not enabled.", getDeviceInfo().getNodeId().getValue()); + return Futures.immediateFuture(Boolean.TRUE); + } final ListenableFuture errorResultFuture = deviceConnectionCheck(); if (errorResultFuture != null) { return errorResultFuture; } - final Iterator statIterator = collectingStatType.iterator(); - final SettableFuture settableStatResultFuture = SettableFuture.create(); - statChainFuture(statIterator, settableStatResultFuture); - return settableStatResultFuture; + synchronized (COLLECTION_STAT_TYPE_LOCK) { + final Iterator statIterator = collectingStatType.iterator(); + final SettableFuture settableStatResultFuture = SettableFuture.create(); + + // write start timestamp to state snapshot container + StatisticsGatheringUtils.markDeviceStateSnapshotStart(deviceContext); + + statChainFuture(statIterator, settableStatResultFuture, initial); + + // write end timestamp to state snapshot container + Futures.addCallback(settableStatResultFuture, new FutureCallback() { + @Override + public void onSuccess(@Nullable final Boolean result) { + StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceContext, true); + } + @Override + public void onFailure(final Throwable t) { + if (!(t instanceof TransactionChainClosedException)) { + StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceContext, false); + } + } + }); + return settableStatResultFuture; + } } - private ListenableFuture chooseStat(final MultipartType multipartType) { + private ListenableFuture chooseStat(final MultipartType multipartType, final boolean initial){ switch (multipartType) { case OFPMPFLOW: - return collectFlowStatistics(multipartType); + return collectFlowStatistics(multipartType, initial); case OFPMPTABLE: return collectTableStatistics(multipartType); case OFPMPPORTSTATS: @@ -125,9 +192,10 @@ public class StatisticsContextImpl implements StatisticsContext { } } + @Override public RequestContext createRequestContext() { - final AbstractRequestContext ret = new AbstractRequestContext(deviceContext.getReservedXid()) { + final AbstractRequestContext ret = new AbstractRequestContext(deviceInfo.reserveXidForDeviceMessage()) { @Override public void close() { requestContexts.remove(this); @@ -139,37 +207,68 @@ public class StatisticsContextImpl implements StatisticsContext { @Override public void close() { - for (final RequestContext requestContext : requestContexts) { - RequestContextUtil.closeRequestContextWithRpcError(requestContext, CONNECTION_CLOSED); - } - if (null != pollTimeout && !pollTimeout.isExpired()) { - pollTimeout.cancel(); + if (CONTEXT_STATE.TERMINATION.equals(getState())) { + if (LOG.isDebugEnabled()) { + LOG.debug("Statistics context is already in state TERMINATION."); + } + } else { + setState(CONTEXT_STATE.TERMINATION); + schedulingEnabled = false; + for (final Iterator> iterator = Iterators.consumingIterator(requestContexts.iterator()); + iterator.hasNext(); ) { + RequestContextUtil.closeRequestContextWithRpcError(iterator.next(), CONNECTION_CLOSED); + } + if (null != pollTimeout && !pollTimeout.isExpired()) { + pollTimeout.cancel(); + } } } @Override - public void setPollTimeout(Timeout pollTimeout) { + public void setSchedulingEnabled(final boolean schedulingEnabled) { + this.schedulingEnabled = schedulingEnabled; + } + + @Override + public boolean isSchedulingEnabled() { + return schedulingEnabled; + } + + @Override + public void setPollTimeout(final Timeout pollTimeout) { this.pollTimeout = pollTimeout; } @Override public Optional getPollTimeout() { - return Optional.fromNullable(pollTimeout); + return Optional.ofNullable(pollTimeout); } - void statChainFuture(final Iterator iterator, final SettableFuture resultFuture) { + private void statChainFuture(final Iterator iterator, final SettableFuture resultFuture, final boolean initial) { + if (ConnectionContext.CONNECTION_STATE.RIP.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) { + final String errMsg = String.format("Device connection is closed for Node : %s.", + getDeviceInfo().getNodeId()); + LOG.debug(errMsg); + resultFuture.setException(new IllegalStateException(errMsg)); + return; + } if ( ! iterator.hasNext()) { resultFuture.set(Boolean.TRUE); + LOG.debug("Stats collection successfully finished for node {}", getDeviceInfo().getNodeId()); return; } - final ListenableFuture deviceStatisticsCollectionFuture = chooseStat(iterator.next()); + + final MultipartType nextType = iterator.next(); + LOG.debug("Stats iterating to next type for node {} of type {}", getDeviceInfo().getNodeId(), nextType); + + final ListenableFuture deviceStatisticsCollectionFuture = chooseStat(nextType, initial); Futures.addCallback(deviceStatisticsCollectionFuture, new FutureCallback() { @Override public void onSuccess(final Boolean result) { - statChainFuture(iterator, resultFuture); + statChainFuture(iterator, resultFuture, initial); } @Override - public void onFailure(final Throwable t) { + public void onFailure(@Nonnull final Throwable t) { resultFuture.setException(t); } }); @@ -184,7 +283,7 @@ public class StatisticsContextImpl implements StatisticsContext { @VisibleForTesting ListenableFuture deviceConnectionCheck() { if (!ConnectionContext.CONNECTION_STATE.WORKING.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) { - ListenableFuture resultingFuture = SettableFuture.create(); + ListenableFuture resultingFuture; switch (deviceContext.getPrimaryConnectionContext().getConnectionState()) { case RIP: final String errMsg = String.format("Device connection doesn't exist anymore. Primary connection status : %s", @@ -200,59 +299,140 @@ public class StatisticsContextImpl implements StatisticsContext { return null; } - private ListenableFuture collectFlowStatistics(final MultipartType multipartType) { + //TODO: Refactor twice sending deviceContext into gatheringStatistics + private ListenableFuture collectFlowStatistics(final MultipartType multipartType, final boolean initial) { return devState.isFlowStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics( - statisticsGatheringOnTheFlyService, deviceContext, /*MultipartType.OFPMPFLOW*/ multipartType) : emptyFuture; + statisticsGatheringOnTheFlyService, + getDeviceInfo(), + /*MultipartType.OFPMPFLOW*/ multipartType, + deviceContext, + deviceContext, + initial, multipartReplyTranslator) : emptyFuture; } private ListenableFuture collectTableStatistics(final MultipartType multipartType) { return devState.isTableStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics( - statisticsGatheringService, deviceContext, /*MultipartType.OFPMPTABLE*/ multipartType) : emptyFuture; + statisticsGatheringService, + getDeviceInfo(), + /*MultipartType.OFPMPTABLE*/ multipartType, + deviceContext, + deviceContext, + false, multipartReplyTranslator) : emptyFuture; } private ListenableFuture collectPortStatistics(final MultipartType multipartType) { return devState.isPortStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics( - statisticsGatheringService, deviceContext, /*MultipartType.OFPMPPORTSTATS*/ multipartType) : emptyFuture; + statisticsGatheringService, + getDeviceInfo(), + /*MultipartType.OFPMPPORTSTATS*/ multipartType, + deviceContext, + deviceContext, + false, multipartReplyTranslator) : emptyFuture; } private ListenableFuture collectQueueStatistics(final MultipartType multipartType) { - return devState.isQueueStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics( - statisticsGatheringService, deviceContext, /*MultipartType.OFPMPQUEUE*/ multipartType) : emptyFuture; + return !devState.isQueueStatisticsAvailable() ? emptyFuture : StatisticsGatheringUtils.gatherStatistics( + statisticsGatheringService, + getDeviceInfo(), + /*MultipartType.OFPMPQUEUE*/ multipartType, + deviceContext, + deviceContext, + false, multipartReplyTranslator); } private ListenableFuture collectGroupDescStatistics(final MultipartType multipartType) { return devState.isGroupAvailable() ? StatisticsGatheringUtils.gatherStatistics( - statisticsGatheringService, deviceContext, /*MultipartType.OFPMPGROUPDESC*/ multipartType) : emptyFuture; + statisticsGatheringService, + getDeviceInfo(), + /*MultipartType.OFPMPGROUPDESC*/ multipartType, + deviceContext, + deviceContext, + false, multipartReplyTranslator) : emptyFuture; } private ListenableFuture collectGroupStatistics(final MultipartType multipartType) { return devState.isGroupAvailable() ? StatisticsGatheringUtils.gatherStatistics( - statisticsGatheringService, deviceContext, /*MultipartType.OFPMPGROUP*/ multipartType) : emptyFuture; + statisticsGatheringService, + getDeviceInfo(), + /*MultipartType.OFPMPGROUP*/ multipartType, + deviceContext, + deviceContext, + false, multipartReplyTranslator) : emptyFuture; } private ListenableFuture collectMeterConfigStatistics(final MultipartType multipartType) { return devState.isMetersAvailable() ? StatisticsGatheringUtils.gatherStatistics( - statisticsGatheringService, deviceContext, /*MultipartType.OFPMPMETERCONFIG*/ multipartType) : emptyFuture; + statisticsGatheringService, + getDeviceInfo(), + /*MultipartType.OFPMPMETERCONFIG*/ multipartType, + deviceContext, + deviceContext, + false, multipartReplyTranslator) : emptyFuture; } private ListenableFuture collectMeterStatistics(final MultipartType multipartType) { return devState.isMetersAvailable() ? StatisticsGatheringUtils.gatherStatistics( - statisticsGatheringService, deviceContext, /*MultipartType.OFPMPMETER*/ multipartType) : emptyFuture; + statisticsGatheringService, + getDeviceInfo(), + /*MultipartType.OFPMPMETER*/ multipartType, + deviceContext, + deviceContext, + false, multipartReplyTranslator) : emptyFuture; } @VisibleForTesting - protected void setStatisticsGatheringService(StatisticsGatheringService statisticsGatheringService) { + void setStatisticsGatheringService(final StatisticsGatheringService statisticsGatheringService) { this.statisticsGatheringService = statisticsGatheringService; } @VisibleForTesting - protected void setStatisticsGatheringOnTheFlyService(StatisticsGatheringOnTheFlyService + void setStatisticsGatheringOnTheFlyService(final StatisticsGatheringOnTheFlyService statisticsGatheringOnTheFlyService) { this.statisticsGatheringOnTheFlyService = statisticsGatheringOnTheFlyService; } @Override - public ItemLifecycleListener getItemLifeCycleListener() { + public ItemLifecycleListener getItemLifeCycleListener () { return itemLifeCycleListener; } + + @Override + public CONTEXT_STATE getState() { + return this.state; + } + + @Override + public void setState(CONTEXT_STATE state) { + this.state = state; + } + + @Override + public ServiceGroupIdentifier getServiceIdentifier() { + return this.deviceInfo.getServiceIdentifier(); + } + + @Override + public DeviceInfo getDeviceInfo() { + return this.deviceInfo; + } + + @Override + public void startupClusterServices() throws ExecutionException, InterruptedException { + if (!this.shuttingDownStatisticsPolling) { + this.statListForCollectingInitialization(); + this.initialGatherDynamicData(); + myManager.startScheduling(deviceInfo); + } + } + + @Override + public ListenableFuture stopClusterServices(boolean deviceDisconnected) { + myManager.stopScheduling(deviceInfo); + return Futures.immediateFuture(null); + } + + @Override + public LifecycleService getLifecycleService() { + return lifecycleService; + } }