X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fimpl%2Fstatistics%2FStatisticsContextImpl.java;h=c00a45fbca5523a7e684db47a45c6fe85e83f409;hb=79cd16be95f1f2c501794fc77e9e4a1cb1d5260c;hp=ea09df7573e0615c5578fade4d1b531c91251ce1;hpb=9b804bda458df55dbbb04bdaec733cac43a1008e;p=openflowplugin.git diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsContextImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsContextImpl.java index ea09df7573..c00a45fbca 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsContextImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsContextImpl.java @@ -9,7 +9,6 @@ package org.opendaylight.openflowplugin.impl.statistics; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterators; @@ -23,28 +22,32 @@ import java.util.Collection; import java.util.HashSet; import java.util.Iterator; import java.util.List; -import javax.annotation.CheckForNull; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; +import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier; import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext; import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext; +import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo; import org.opendaylight.openflowplugin.api.openflow.device.DeviceState; import org.opendaylight.openflowplugin.api.openflow.device.RequestContext; +import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleConductor; import org.opendaylight.openflowplugin.api.openflow.rpc.listener.ItemLifecycleListener; import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext; +import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager; import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext; import org.opendaylight.openflowplugin.impl.rpc.listener.ItemLifecycleListenerImpl; import org.opendaylight.openflowplugin.impl.services.RequestContextUtil; import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringOnTheFlyService; import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringService; +import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** - * Created by Martin Bobak <mbobak@cisco.com> on 1.4.2015. - */ -public class StatisticsContextImpl implements StatisticsContext { +class StatisticsContextImpl implements StatisticsContext { private static final Logger LOG = LoggerFactory.getLogger(StatisticsContextImpl.class); private static final String CONNECTION_CLOSED = "Connection closed."; @@ -56,24 +59,36 @@ public class StatisticsContextImpl implements StatisticsContext { private final ListenableFuture emptyFuture; private final boolean shuttingDownStatisticsPolling; private final Object COLLECTION_STAT_TYPE_LOCK = new Object(); + private final SinglePurposeMultipartReplyTranslator multipartReplyTranslator; @GuardedBy("COLLECTION_STAT_TYPE_LOCK") private List collectingStatType; private StatisticsGatheringService statisticsGatheringService; private StatisticsGatheringOnTheFlyService statisticsGatheringOnTheFlyService; private Timeout pollTimeout; - - public StatisticsContextImpl(@CheckForNull final DeviceContext deviceContext, - final boolean shuttingDownStatisticsPolling) { - this.deviceContext = Preconditions.checkNotNull(deviceContext); + private final DeviceInfo deviceInfo; + private final StatisticsManager myManager; + + private volatile boolean schedulingEnabled; + private volatile CONTEXT_STATE state; + + StatisticsContextImpl(@Nonnull final DeviceInfo deviceInfo, + @Nonnull final boolean shuttingDownStatisticsPolling, + @Nonnull final LifecycleConductor lifecycleConductor, + @Nonnull final ConvertorExecutor convertorExecutor, + @Nonnull final StatisticsManager myManager) { + this.deviceContext = Preconditions.checkNotNull(lifecycleConductor.getDeviceContext(deviceInfo)); this.devState = Preconditions.checkNotNull(deviceContext.getDeviceState()); this.shuttingDownStatisticsPolling = shuttingDownStatisticsPolling; - emptyFuture = Futures.immediateFuture(new Boolean(false)); + multipartReplyTranslator = new SinglePurposeMultipartReplyTranslator(convertorExecutor); + emptyFuture = Futures.immediateFuture(false); statisticsGatheringService = new StatisticsGatheringService(this, deviceContext); - statisticsGatheringOnTheFlyService = new StatisticsGatheringOnTheFlyService(this, deviceContext); + statisticsGatheringOnTheFlyService = new StatisticsGatheringOnTheFlyService(this, deviceContext, convertorExecutor); itemLifeCycleListener = new ItemLifecycleListenerImpl(deviceContext); statListForCollectingInitialization(); - this.deviceContext.setStatisticsContext(StatisticsContextImpl.this); + setState(CONTEXT_STATE.INITIALIZATION); + this.deviceInfo = deviceInfo; + this.myManager = myManager; } @Override @@ -104,10 +119,20 @@ public class StatisticsContextImpl implements StatisticsContext { } } + + @Override + public ListenableFuture initialGatherDynamicData() { + return gatherDynamicData(true); + } + @Override - public ListenableFuture gatherDynamicData() { + public ListenableFuture gatherDynamicData(){ + return gatherDynamicData(false); + } + + private ListenableFuture gatherDynamicData(final boolean initial) { if (shuttingDownStatisticsPolling) { - LOG.debug("Statistics for device {} is not enabled.", deviceContext.getDeviceState().getNodeId()); + LOG.debug("Statistics for device {} is not enabled.", getDeviceInfo().getNodeId().getValue()); return Futures.immediateFuture(Boolean.TRUE); } final ListenableFuture errorResultFuture = deviceConnectionCheck(); @@ -121,7 +146,7 @@ public class StatisticsContextImpl implements StatisticsContext { // write start timestamp to state snapshot container StatisticsGatheringUtils.markDeviceStateSnapshotStart(deviceContext); - statChainFuture(statIterator, settableStatResultFuture); + statChainFuture(statIterator, settableStatResultFuture, initial); // write end timestamp to state snapshot container Futures.addCallback(settableStatResultFuture, new FutureCallback() { @@ -138,10 +163,10 @@ public class StatisticsContextImpl implements StatisticsContext { } } - private ListenableFuture chooseStat(final MultipartType multipartType){ + private ListenableFuture chooseStat(final MultipartType multipartType, final boolean initial){ switch (multipartType) { case OFPMPFLOW: - return collectFlowStatistics(multipartType); + return collectFlowStatistics(multipartType, initial); case OFPMPTABLE: return collectTableStatistics(multipartType); case OFPMPPORTSTATS: @@ -165,7 +190,7 @@ public class StatisticsContextImpl implements StatisticsContext { @Override public RequestContext createRequestContext() { - final AbstractRequestContext ret = new AbstractRequestContext(deviceContext.reservedXidForDeviceMessage()) { + final AbstractRequestContext ret = new AbstractRequestContext(deviceContext.reserveXidForDeviceMessage()) { @Override public void close() { requestContexts.remove(this); @@ -177,130 +202,186 @@ public class StatisticsContextImpl implements StatisticsContext { @Override public void close() { - for (final Iterator> iterator = Iterators.consumingIterator(requestContexts.iterator()); - iterator.hasNext();) { - RequestContextUtil.closeRequestContextWithRpcError(iterator.next(), CONNECTION_CLOSED); - } - if (null != pollTimeout && !pollTimeout.isExpired()) { - pollTimeout.cancel(); + if (CONTEXT_STATE.TERMINATION.equals(getState())) { + if (LOG.isDebugEnabled()) { + LOG.debug("Statistics context is already in state TERMINATION."); + } + } else { + setState(CONTEXT_STATE.TERMINATION); + schedulingEnabled = false; + for (final Iterator> iterator = Iterators.consumingIterator(requestContexts.iterator()); + iterator.hasNext(); ) { + RequestContextUtil.closeRequestContextWithRpcError(iterator.next(), CONNECTION_CLOSED); + } + if (null != pollTimeout && !pollTimeout.isExpired()) { + pollTimeout.cancel(); } } + } - @Override - public void setPollTimeout (Timeout pollTimeout){ - this.pollTimeout = pollTimeout; - } - - @Override - public Optional getPollTimeout () { - return Optional.fromNullable(pollTimeout); - } + @Override + public void setSchedulingEnabled(final boolean schedulingEnabled) { + this.schedulingEnabled = schedulingEnabled; + } - void statChainFuture ( final Iterator iterator, final SettableFuture resultFuture){ + @Override + public boolean isSchedulingEnabled() { + return schedulingEnabled; + } - if (ConnectionContext.CONNECTION_STATE.RIP.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) { - final String errMsg = String.format("Device connection is closed for Node : %s.", - deviceContext.getDeviceState().getNodeId()); - LOG.debug(errMsg); - resultFuture.setException(new IllegalStateException(errMsg)); - return; - } + @Override + public void setPollTimeout(final Timeout pollTimeout) { + this.pollTimeout = pollTimeout; + } - if (!iterator.hasNext()) { - resultFuture.set(Boolean.TRUE); - LOG.debug("Stats collection successfully finished for node {}", deviceContext.getDeviceState().getNodeId()); - return; - } + @Override + public Optional getPollTimeout() { + return Optional.ofNullable(pollTimeout); + } - final MultipartType nextType = iterator.next(); - LOG.debug("Stats iterating to next type for node {} of type {}", deviceContext.getDeviceState().getNodeId(), nextType); + private void statChainFuture(final Iterator iterator, final SettableFuture resultFuture, final boolean initial) { + if (ConnectionContext.CONNECTION_STATE.RIP.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) { + final String errMsg = String.format("Device connection is closed for Node : %s.", + getDeviceInfo().getNodeId()); + LOG.debug(errMsg); + resultFuture.setException(new IllegalStateException(errMsg)); + return; + } + if ( ! iterator.hasNext()) { + resultFuture.set(Boolean.TRUE); + LOG.debug("Stats collection successfully finished for node {}", getDeviceInfo().getNodeId()); + return; + } - final ListenableFuture deviceStatisticsCollectionFuture = chooseStat(nextType); - Futures.addCallback(deviceStatisticsCollectionFuture, new FutureCallback() { - @Override - public void onSuccess(final Boolean result) { - statChainFuture(iterator, resultFuture); - } + final MultipartType nextType = iterator.next(); + LOG.debug("Stats iterating to next type for node {} of type {}", getDeviceInfo().getNodeId(), nextType); - @Override - public void onFailure(final Throwable t) { - resultFuture.setException(t); - } - }); - } + final ListenableFuture deviceStatisticsCollectionFuture = chooseStat(nextType, initial); + Futures.addCallback(deviceStatisticsCollectionFuture, new FutureCallback() { + @Override + public void onSuccess(final Boolean result) { + statChainFuture(iterator, resultFuture, initial); + } + @Override + public void onFailure(@Nonnull final Throwable t) { + resultFuture.setException(t); + } + }); + } - /** - * Method checks a device state. It returns null for be able continue. Otherwise it returns immediateFuture - * which has to be returned from caller too - * - * @return - */ - @VisibleForTesting - ListenableFuture deviceConnectionCheck () { - if (!ConnectionContext.CONNECTION_STATE.WORKING.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) { - ListenableFuture resultingFuture = SettableFuture.create(); - switch (deviceContext.getPrimaryConnectionContext().getConnectionState()) { - case RIP: - final String errMsg = String.format("Device connection doesn't exist anymore. Primary connection status : %s", - deviceContext.getPrimaryConnectionContext().getConnectionState()); - resultingFuture = Futures.immediateFailedFuture(new Throwable(errMsg)); - break; - default: - resultingFuture = Futures.immediateCheckedFuture(Boolean.TRUE); - break; - } - return resultingFuture; + /** + * Method checks a device state. It returns null for be able continue. Otherwise it returns immediateFuture + * which has to be returned from caller too + * + * @return + */ + @VisibleForTesting + ListenableFuture deviceConnectionCheck() { + if (!ConnectionContext.CONNECTION_STATE.WORKING.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) { + ListenableFuture resultingFuture; + switch (deviceContext.getPrimaryConnectionContext().getConnectionState()) { + case RIP: + final String errMsg = String.format("Device connection doesn't exist anymore. Primary connection status : %s", + deviceContext.getPrimaryConnectionContext().getConnectionState()); + resultingFuture = Futures.immediateFailedFuture(new Throwable(errMsg)); + break; + default: + resultingFuture = Futures.immediateCheckedFuture(Boolean.TRUE); + break; } - return null; + return resultingFuture; } + return null; + } - private ListenableFuture collectFlowStatistics ( final MultipartType multipartType){ - return devState.isFlowStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics( - statisticsGatheringOnTheFlyService, deviceContext, /*MultipartType.OFPMPFLOW*/ multipartType) : emptyFuture; - } + //TODO: Refactor twice sending deviceContext into gatheringStatistics + private ListenableFuture collectFlowStatistics(final MultipartType multipartType, final boolean initial) { + return devState.isFlowStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics( + statisticsGatheringOnTheFlyService, + getDeviceInfo(), + /*MultipartType.OFPMPFLOW*/ multipartType, + deviceContext, + deviceContext, + initial, multipartReplyTranslator) : emptyFuture; + } - private ListenableFuture collectTableStatistics ( final MultipartType multipartType){ - return devState.isTableStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics( - statisticsGatheringService, deviceContext, /*MultipartType.OFPMPTABLE*/ multipartType) : emptyFuture; - } + private ListenableFuture collectTableStatistics(final MultipartType multipartType) { + return devState.isTableStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics( + statisticsGatheringService, + getDeviceInfo(), + /*MultipartType.OFPMPTABLE*/ multipartType, + deviceContext, + deviceContext, + false, multipartReplyTranslator) : emptyFuture; + } - private ListenableFuture collectPortStatistics ( final MultipartType multipartType){ - return devState.isPortStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics( - statisticsGatheringService, deviceContext, /*MultipartType.OFPMPPORTSTATS*/ multipartType) : emptyFuture; - } + private ListenableFuture collectPortStatistics(final MultipartType multipartType) { + return devState.isPortStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics( + statisticsGatheringService, + getDeviceInfo(), + /*MultipartType.OFPMPPORTSTATS*/ multipartType, + deviceContext, + deviceContext, + false, multipartReplyTranslator) : emptyFuture; + } - private ListenableFuture collectQueueStatistics ( final MultipartType multipartType){ - return devState.isQueueStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics( - statisticsGatheringService, deviceContext, /*MultipartType.OFPMPQUEUE*/ multipartType) : emptyFuture; - } + private ListenableFuture collectQueueStatistics(final MultipartType multipartType) { + return !devState.isQueueStatisticsAvailable() ? emptyFuture : StatisticsGatheringUtils.gatherStatistics( + statisticsGatheringService, + getDeviceInfo(), + /*MultipartType.OFPMPQUEUE*/ multipartType, + deviceContext, + deviceContext, + false, multipartReplyTranslator); + } - private ListenableFuture collectGroupDescStatistics ( final MultipartType multipartType){ - return devState.isGroupAvailable() ? StatisticsGatheringUtils.gatherStatistics( - statisticsGatheringService, deviceContext, /*MultipartType.OFPMPGROUPDESC*/ multipartType) : emptyFuture; - } + private ListenableFuture collectGroupDescStatistics(final MultipartType multipartType) { + return devState.isGroupAvailable() ? StatisticsGatheringUtils.gatherStatistics( + statisticsGatheringService, + getDeviceInfo(), + /*MultipartType.OFPMPGROUPDESC*/ multipartType, + deviceContext, + deviceContext, + false, multipartReplyTranslator) : emptyFuture; + } - private ListenableFuture collectGroupStatistics ( final MultipartType multipartType){ - return devState.isGroupAvailable() ? StatisticsGatheringUtils.gatherStatistics( - statisticsGatheringService, deviceContext, /*MultipartType.OFPMPGROUP*/ multipartType) : emptyFuture; - } + private ListenableFuture collectGroupStatistics(final MultipartType multipartType) { + return devState.isGroupAvailable() ? StatisticsGatheringUtils.gatherStatistics( + statisticsGatheringService, + getDeviceInfo(), + /*MultipartType.OFPMPGROUP*/ multipartType, + deviceContext, + deviceContext, + false, multipartReplyTranslator) : emptyFuture; + } - private ListenableFuture collectMeterConfigStatistics ( final MultipartType multipartType){ - return devState.isMetersAvailable() ? StatisticsGatheringUtils.gatherStatistics( - statisticsGatheringService, deviceContext, /*MultipartType.OFPMPMETERCONFIG*/ multipartType) : emptyFuture; - } + private ListenableFuture collectMeterConfigStatistics(final MultipartType multipartType) { + return devState.isMetersAvailable() ? StatisticsGatheringUtils.gatherStatistics( + statisticsGatheringService, + getDeviceInfo(), + /*MultipartType.OFPMPMETERCONFIG*/ multipartType, + deviceContext, + deviceContext, + false, multipartReplyTranslator) : emptyFuture; + } - private ListenableFuture collectMeterStatistics ( final MultipartType multipartType){ - return devState.isMetersAvailable() ? StatisticsGatheringUtils.gatherStatistics( - statisticsGatheringService, deviceContext, /*MultipartType.OFPMPMETER*/ multipartType) : emptyFuture; - } + private ListenableFuture collectMeterStatistics(final MultipartType multipartType) { + return devState.isMetersAvailable() ? StatisticsGatheringUtils.gatherStatistics( + statisticsGatheringService, + getDeviceInfo(), + /*MultipartType.OFPMPMETER*/ multipartType, + deviceContext, + deviceContext, + false, multipartReplyTranslator) : emptyFuture; + } @VisibleForTesting - protected void setStatisticsGatheringService(final StatisticsGatheringService statisticsGatheringService) { + void setStatisticsGatheringService(final StatisticsGatheringService statisticsGatheringService) { this.statisticsGatheringService = statisticsGatheringService; } @VisibleForTesting - protected void setStatisticsGatheringOnTheFlyService(final StatisticsGatheringOnTheFlyService + void setStatisticsGatheringOnTheFlyService(final StatisticsGatheringOnTheFlyService statisticsGatheringOnTheFlyService) { this.statisticsGatheringOnTheFlyService = statisticsGatheringOnTheFlyService; } @@ -310,9 +391,38 @@ public class StatisticsContextImpl implements StatisticsContext { return itemLifeCycleListener; } + @Override + public CONTEXT_STATE getState() { + return this.state; + } + + @Override + public void setState(CONTEXT_STATE state) { + this.state = state; + } + + @Override + public ServiceGroupIdentifier getServiceIdentifier() { + return this.deviceInfo.getServiceIdentifier(); + } + + @Override + public DeviceInfo getDeviceInfo() { + return this.deviceInfo; + } + + @Override + public void startupClusterServices() throws ExecutionException, InterruptedException { + if (!this.shuttingDownStatisticsPolling) { + this.statListForCollectingInitialization(); + this.initialGatherDynamicData(); + myManager.startScheduling(deviceInfo); + } + } @Override - public DeviceContext getDeviceContext() { - return deviceContext; + public ListenableFuture stopClusterServices() { + myManager.stopScheduling(deviceInfo); + return Futures.immediateFuture(null); } }