X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fimpl%2Fstatistics%2FStatisticsManagerImpl.java;h=299c8783be3a1d416ea5407d69077bb844eb796e;hb=8c93640d1e280f5742331ef7599a517de1163d97;hp=86933a4a2e497bb85afe49f248ddd02471368866;hpb=4ad505cc51f9bda8290c6191e4e16a5d36ce3b27;p=openflowplugin.git diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsManagerImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsManagerImpl.java index 86933a4a2e..299c8783be 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsManagerImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsManagerImpl.java @@ -5,376 +5,136 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.openflowplugin.impl.statistics; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.collect.Iterators; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import io.netty.util.HashedWheelTimer; -import io.netty.util.Timeout; -import io.netty.util.TimerTask; -import java.util.Iterator; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Future; +import java.util.concurrent.Executor; import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; -import javax.annotation.Nonnull; -import org.opendaylight.controller.sal.binding.api.BindingAwareBroker; -import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry; -import org.opendaylight.openflowplugin.api.ConnectionException; -import org.opendaylight.openflowplugin.api.openflow.OFPContext; +import org.eclipse.jdt.annotation.NonNull; +import org.opendaylight.mdsal.binding.api.RpcProviderService; import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext; import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo; -import org.opendaylight.openflowplugin.api.openflow.device.DeviceState; -import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler; -import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler; -import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService; -import org.opendaylight.openflowplugin.api.openflow.rpc.ItemLifeCycleSource; import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext; import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager; import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider; import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProviderFactory; import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor; -import org.opendaylight.yang.gen.v1.urn.opendaylight.multipart.types.rev170112.MultipartReply; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.provider.config.rev160510.OpenflowProviderConfig; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.sm.control.rev150812.ChangeStatisticsWorkMode; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.sm.control.rev150812.ChangeStatisticsWorkModeInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.sm.control.rev150812.ChangeStatisticsWorkModeOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.sm.control.rev150812.GetStatisticsWorkMode; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.sm.control.rev150812.GetStatisticsWorkModeInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.sm.control.rev150812.GetStatisticsWorkModeOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.sm.control.rev150812.GetStatisticsWorkModeOutputBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.sm.control.rev150812.StatisticsManagerControlService; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.sm.control.rev150812.StatisticsWorkMode; -import org.opendaylight.yangtools.yang.common.RpcError; +import org.opendaylight.yangtools.concepts.Registration; +import org.opendaylight.yangtools.yang.common.ErrorType; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class StatisticsManagerImpl implements StatisticsManager, StatisticsManagerControlService { - +public final class StatisticsManagerImpl implements StatisticsManager { private static final Logger LOG = LoggerFactory.getLogger(StatisticsManagerImpl.class); - private static final long DEFAULT_STATS_TIMEOUT_SEC = 50L; - private final ConvertorExecutor converterExecutor; - private DeviceInitializationPhaseHandler deviceInitPhaseHandler; - private DeviceTerminationPhaseHandler deviceTerminationPhaseHandler; - - private final ConcurrentMap contexts = new ConcurrentHashMap<>(); - - private static long basicTimerDelay; - private static long currentTimerDelay; - private static long maximumTimerDelay; //wait time for next statistics + @VisibleForTesting + final ConcurrentMap contexts = new ConcurrentHashMap<>(); - private StatisticsWorkMode workMode = StatisticsWorkMode.COLLECTALL; + private final OpenflowProviderConfig config; + private final ConvertorExecutor converterExecutor; + private final Executor executor; private final Semaphore workModeGuard = new Semaphore(1, true); - private boolean isStatisticsPollingOn; - private BindingAwareBroker.RpcRegistration controlServiceRegistration; - - private final HashedWheelTimer hashedWheelTimer; - - @Override - public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) { - deviceInitPhaseHandler = handler; - } + private final Registration controlServiceRegistration; + private final StatisticsWorkMode workMode = StatisticsWorkMode.COLLECTALL; + private boolean isStatisticsFullyDisabled; - public StatisticsManagerImpl(final RpcProviderRegistry rpcProviderRegistry, - final boolean isStatisticsPollingOn, - final HashedWheelTimer hashedWheelTimer, + public StatisticsManagerImpl(@NonNull final OpenflowProviderConfig config, + @NonNull final RpcProviderService rpcProviderRegistry, final ConvertorExecutor convertorExecutor, - final long basicTimerDelay, - final long maximumTimerDelay) { - Preconditions.checkArgument(rpcProviderRegistry != null); - this.converterExecutor = convertorExecutor; - this.controlServiceRegistration = Preconditions.checkNotNull( - rpcProviderRegistry.addRpcImplementation(StatisticsManagerControlService.class, this) - ); - this.isStatisticsPollingOn = isStatisticsPollingOn; - this.basicTimerDelay = basicTimerDelay; - this.currentTimerDelay = basicTimerDelay; - this.maximumTimerDelay = maximumTimerDelay; - this.hashedWheelTimer = hashedWheelTimer; - } - - @Override - public void onDeviceContextLevelUp(final DeviceInfo deviceInfo, - final LifecycleService lifecycleService) throws Exception { - - deviceInitPhaseHandler.onDeviceContextLevelUp(deviceInfo, lifecycleService); - } - - @VisibleForTesting - void pollStatistics(final DeviceState deviceState, - final StatisticsContext statisticsContext, - final TimeCounter timeCounter, - final DeviceInfo deviceInfo) { - - if (!statisticsContext.isSchedulingEnabled()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Disabled statistics scheduling for device: {}", deviceInfo.getNodeId().getValue()); - } - return; - } - - if (LOG.isDebugEnabled()) { - LOG.debug("POLLING ALL STATISTICS for device: {}", deviceInfo.getNodeId()); - } - - timeCounter.markStart(); - final ListenableFuture deviceStatisticsCollectionFuture = statisticsContext.gatherDynamicData(); - Futures.addCallback(deviceStatisticsCollectionFuture, new FutureCallback() { - @Override - public void onSuccess(final Boolean o) { - timeCounter.addTimeMark(); - calculateTimerDelay(timeCounter); - scheduleNextPolling(deviceState, deviceInfo, statisticsContext, timeCounter); - } - - @Override - public void onFailure(@Nonnull final Throwable throwable) { - timeCounter.addTimeMark(); - LOG.warn("Statistics gathering for single node {} was not successful: {}", deviceInfo.getLOGValue(), - throwable.getMessage()); - if (LOG.isTraceEnabled()) { - LOG.trace("Gathering for node {} failure: ", deviceInfo.getLOGValue(), throwable); - } - calculateTimerDelay(timeCounter); - if (throwable instanceof ConnectionException) { - // ConnectionException is raised by StatisticsContextImpl class when the connections - // move to RIP state. In this particular case, there is no need to reschedule - // because this statistics manager should be closed soon - LOG.warn("Node {} is no more connected, stopping the statistics collection", - deviceInfo.getLOGValue(),throwable); - stopScheduling(deviceInfo); - } else { - if (!(throwable instanceof CancellationException)) { - LOG.warn("Unexpected error occurred during statistics collection for node {}, rescheduling " + - "statistics collections", deviceInfo.getLOGValue(),throwable); - } - scheduleNextPolling(deviceState, deviceInfo, statisticsContext, timeCounter); - } - } - }); - - final long averageTime = TimeUnit.MILLISECONDS.toSeconds(timeCounter.getAverageTimeBetweenMarks()); - final long statsTimeoutSec = averageTime > 0 ? 3 * averageTime : DEFAULT_STATS_TIMEOUT_SEC; - final TimerTask timerTask = timeout -> { - if (!deviceStatisticsCollectionFuture.isDone()) { - LOG.info("Statistics collection for node {} still in progress even after {} secs", deviceInfo.getLOGValue(), statsTimeoutSec); - deviceStatisticsCollectionFuture.cancel(true); - } - }; - - hashedWheelTimer.newTimeout(timerTask, statsTimeoutSec, TimeUnit.SECONDS); - } - - private void scheduleNextPolling(final DeviceState deviceState, - final DeviceInfo deviceInfo, - final StatisticsContext statisticsContext, - final TimeCounter timeCounter) { - if (LOG.isDebugEnabled()) { - LOG.debug("SCHEDULING NEXT STATISTICS POLLING for device: {}", deviceInfo.getNodeId()); - } - if (isStatisticsPollingOn) { - final Timeout pollTimeout = hashedWheelTimer.newTimeout( - timeout -> pollStatistics( - deviceState, - statisticsContext, - timeCounter, - deviceInfo), - currentTimerDelay, - TimeUnit.MILLISECONDS); - statisticsContext.setPollTimeout(pollTimeout); - } + @NonNull final Executor executor) { + this.config = config; + this.executor = executor; + converterExecutor = convertorExecutor; + controlServiceRegistration = rpcProviderRegistry.registerRpcImplementations( + (GetStatisticsWorkMode) this::getStatisticsWorkMode, + (ChangeStatisticsWorkMode) this::changeStatisticsWorkMode); } @VisibleForTesting - void calculateTimerDelay(final TimeCounter timeCounter) { - final long averageStatisticsGatheringTime = timeCounter.getAverageTimeBetweenMarks(); - if (averageStatisticsGatheringTime > currentTimerDelay) { - currentTimerDelay *= 2; - if (currentTimerDelay > maximumTimerDelay) { - currentTimerDelay = maximumTimerDelay; - } - } else { - if (currentTimerDelay > basicTimerDelay) { - currentTimerDelay /= 2; - } else { - currentTimerDelay = basicTimerDelay; - } - } + ListenableFuture> getStatisticsWorkMode( + final GetStatisticsWorkModeInput input) { + return RpcResultBuilder.success(new GetStatisticsWorkModeOutputBuilder() + .setMode(workMode) + .build()).buildFuture(); } @VisibleForTesting - static long getCurrentTimerDelay() { - return currentTimerDelay; - } - - @Override - public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) { - Optional.ofNullable(contexts.get(deviceInfo)).ifPresent(OFPContext::close); - deviceTerminationPhaseHandler.onDeviceContextLevelDown(deviceInfo); - } - - @Override - public Future> getStatisticsWorkMode() { - final GetStatisticsWorkModeOutputBuilder smModeOutputBld = new GetStatisticsWorkModeOutputBuilder(); - smModeOutputBld.setMode(workMode); - return RpcResultBuilder.success(smModeOutputBld.build()).buildFuture(); - } - - @Override - public Future> changeStatisticsWorkMode(ChangeStatisticsWorkModeInput input) { - final Future> result; - // acquire exclusive access + ListenableFuture> changeStatisticsWorkMode( + final ChangeStatisticsWorkModeInput input) { if (workModeGuard.tryAcquire()) { final StatisticsWorkMode targetWorkMode = input.getMode(); - if (!workMode.equals(targetWorkMode)) { - isStatisticsPollingOn = !(StatisticsWorkMode.FULLYDISABLED.equals(targetWorkMode)); - // iterate through stats-ctx: propagate mode - for (Map.Entry entry : contexts.entrySet()) { - final DeviceInfo deviceInfo = entry.getKey(); - final StatisticsContext statisticsContext = entry.getValue(); - final DeviceContext deviceContext = statisticsContext.gainDeviceContext(); - switch (targetWorkMode) { - case COLLECTALL: - scheduleNextPolling(statisticsContext.gainDeviceState(), deviceInfo, statisticsContext, new TimeCounter()); - for (final ItemLifeCycleSource lifeCycleSource : deviceContext.getItemLifeCycleSourceRegistry().getLifeCycleSources()) { - lifeCycleSource.setItemLifecycleListener(null); - } - break; - case FULLYDISABLED: - final Optional pollTimeout = statisticsContext.getPollTimeout(); - if (pollTimeout.isPresent()) { - pollTimeout.get().cancel(); - } - for (final ItemLifeCycleSource lifeCycleSource : deviceContext.getItemLifeCycleSourceRegistry().getLifeCycleSources()) { - lifeCycleSource.setItemLifecycleListener(statisticsContext.getItemLifeCycleListener()); - } - break; - default: - LOG.warn("Statistics work mode not supported: {}", targetWorkMode); - } + isStatisticsFullyDisabled = StatisticsWorkMode.FULLYDISABLED.equals(targetWorkMode); + + contexts.values().forEach(context -> { + switch (targetWorkMode) { + case COLLECTALL: + context.enableGathering(); + break; + case FULLYDISABLED: + context.disableGathering(); + break; + default: + LOG.warn("Statistics work mode not supported: {}", targetWorkMode); } - workMode = targetWorkMode; - } - workModeGuard.release(); - result = RpcResultBuilder.success().buildFuture(); - } else { - result = RpcResultBuilder.failed() - .withError(RpcError.ErrorType.APPLICATION, "mode change already in progress") - .buildFuture(); - } - return result; - } - - @Override - public void startScheduling(final DeviceInfo deviceInfo) { - if (!isStatisticsPollingOn) { - LOG.info("Statistics are shutdown for device: {}", deviceInfo.getNodeId()); - return; - } - - final StatisticsContext statisticsContext = contexts.get(deviceInfo); - - if (statisticsContext == null) { - LOG.warn("Statistics context not found for device: {}", deviceInfo.getNodeId()); - return; - } - - if (statisticsContext.isSchedulingEnabled()) { - LOG.debug("Statistics scheduling is already enabled for device: {}", deviceInfo.getNodeId()); - return; - } - - LOG.info("Scheduling statistics poll for device: {}", deviceInfo.getNodeId()); - - statisticsContext.setSchedulingEnabled(true); - scheduleNextPolling( - statisticsContext.gainDeviceState(), - deviceInfo, - statisticsContext, - new TimeCounter() - ); - } - - @Override - public void stopScheduling(final DeviceInfo deviceInfo) { - if (LOG.isDebugEnabled()) { - LOG.debug("Stopping statistics scheduling for device: {}", deviceInfo.getNodeId()); - } - - final StatisticsContext statisticsContext = contexts.get(deviceInfo); - - if (statisticsContext == null) { - LOG.warn("Statistics context not found for device: {}", deviceInfo.getNodeId()); - return; - } - - statisticsContext.setSchedulingEnabled(false); - } + }); - @Override - public void close() { - if (controlServiceRegistration != null) { - controlServiceRegistration.close(); - controlServiceRegistration = null; + workModeGuard.release(); + return RpcResultBuilder.success().buildFuture(); } - for (final Iterator iterator = Iterators.consumingIterator(contexts.values().iterator()); - iterator.hasNext();) { - iterator.next().close(); - } - } - - @Override - public void setDeviceTerminationPhaseHandler(final DeviceTerminationPhaseHandler handler) { - this.deviceTerminationPhaseHandler = handler; - } - - @Override - public void setIsStatisticsPollingOn(boolean isStatisticsPollingOn){ - this.isStatisticsPollingOn = isStatisticsPollingOn; + return RpcResultBuilder.failed() + .withError(ErrorType.APPLICATION, "Statistics work mode change is already in progress") + .buildFuture(); } @Override - public StatisticsContext createContext(@Nonnull final DeviceContext deviceContext) { - + public StatisticsContext createContext(@NonNull final DeviceContext deviceContext, + final boolean useReconciliationFramework) { final MultipartWriterProvider statisticsWriterProvider = MultipartWriterProviderFactory - .createDefaultProvider(deviceContext); + .createDefaultProvider(deviceContext); - return deviceContext.canUseSingleLayerSerialization() ? - new StatisticsContextImpl( - isStatisticsPollingOn, - deviceContext, - converterExecutor, - this, - statisticsWriterProvider) : - new StatisticsContextImpl( - isStatisticsPollingOn, + final StatisticsContext statisticsContext = new StatisticsContextImpl<>( deviceContext, converterExecutor, - this, - statisticsWriterProvider); - } + statisticsWriterProvider, + executor, + config, + !isStatisticsFullyDisabled && config.getIsStatisticsPollingOn(), + useReconciliationFramework); - public void onDeviceRemoved(DeviceInfo deviceInfo) { - contexts.remove(deviceInfo); - LOG.debug("Statistics context removed for node {}", deviceInfo.getLOGValue()); + contexts.put(deviceContext.getDeviceInfo(), statisticsContext); + return statisticsContext; } @Override - public void setBasicTimerDelay(final long basicTimerDelay) { - this.basicTimerDelay = basicTimerDelay; + public void onDeviceRemoved(final DeviceInfo deviceInfo) { + contexts.remove(deviceInfo); + LOG.debug("Statistics context removed for node {}", deviceInfo); } @Override - public void setMaximumTimerDelay(final long maximumTimerDelay) { - this.maximumTimerDelay = maximumTimerDelay; + public void close() { + isStatisticsFullyDisabled = true; + controlServiceRegistration.close(); + + for (StatisticsContext context : contexts.values()) { + context.close(); + } + + contexts.clear(); } }