X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fimpl%2Fstatistics%2FStatisticsManagerImpl.java;h=392f08facb5e3d2aba96731af4be1da26b05e781;hb=c78e3f6f59566a04a5381a0c7bc55ab708dc2f2c;hp=98e6d993e42abe188772aa33c6c2445398bb4661;hpb=77a7d6170ea741d9109dee08e981af9143bc0141;p=openflowplugin.git diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsManagerImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsManagerImpl.java index 98e6d993e4..392f08facb 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsManagerImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsManagerImpl.java @@ -5,311 +5,140 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.openflowplugin.impl.statistics; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.base.Verify; -import com.google.common.collect.Iterators; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; +import com.google.common.collect.ImmutableClassToInstanceMap; import com.google.common.util.concurrent.ListenableFuture; -import io.netty.util.HashedWheelTimer; -import io.netty.util.Timeout; -import io.netty.util.TimerTask; -import java.util.Iterator; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Future; +import java.util.concurrent.Executor; import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; -import javax.annotation.Nonnull; -import org.opendaylight.controller.sal.binding.api.BindingAwareBroker; -import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry; -import org.opendaylight.openflowplugin.api.openflow.OFPContext; +import org.eclipse.jdt.annotation.NonNull; +import org.opendaylight.mdsal.binding.api.RpcProviderService; import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext; import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo; -import org.opendaylight.openflowplugin.api.openflow.device.DeviceState; -import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler; -import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler; -import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService; -import org.opendaylight.openflowplugin.api.openflow.rpc.ItemLifeCycleSource; import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext; import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager; +import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider; +import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProviderFactory; import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.provider.config.rev160510.OpenflowProviderConfig; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.sm.control.rev150812.ChangeStatisticsWorkMode; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.sm.control.rev150812.ChangeStatisticsWorkModeInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.sm.control.rev150812.ChangeStatisticsWorkModeOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.sm.control.rev150812.GetStatisticsWorkMode; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.sm.control.rev150812.GetStatisticsWorkModeInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.sm.control.rev150812.GetStatisticsWorkModeOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.sm.control.rev150812.GetStatisticsWorkModeOutputBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.sm.control.rev150812.StatisticsManagerControlService; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.sm.control.rev150812.StatisticsWorkMode; -import org.opendaylight.yangtools.yang.common.RpcError; +import org.opendaylight.yangtools.concepts.Registration; +import org.opendaylight.yangtools.yang.binding.Rpc; +import org.opendaylight.yangtools.yang.common.ErrorType; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class StatisticsManagerImpl implements StatisticsManager, StatisticsManagerControlService { - +public final class StatisticsManagerImpl implements StatisticsManager { private static final Logger LOG = LoggerFactory.getLogger(StatisticsManagerImpl.class); - private static final long DEFAULT_STATS_TIMEOUT_SEC = 50L; - private final ConvertorExecutor convertorExecutor; - - private DeviceInitializationPhaseHandler deviceInitPhaseHandler; - private DeviceTerminationPhaseHandler deviceTerminPhaseHandler; - - private final ConcurrentMap contexts = new ConcurrentHashMap<>(); - - private static final long basicTimerDelay = 3000; - private static long currentTimerDelay = basicTimerDelay; - private static final long maximumTimerDelay = 900000; //wait max 15 minutes for next statistics - - private StatisticsWorkMode workMode = StatisticsWorkMode.COLLECTALL; - private final Semaphore workModeGuard = new Semaphore(1, true); - private boolean isStatisticsPollingEnabled; - private BindingAwareBroker.RpcRegistration controlServiceRegistration; - - private final HashedWheelTimer hashedWheelTimer; - - @Override - public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) { - deviceInitPhaseHandler = handler; - } - - public StatisticsManagerImpl(final RpcProviderRegistry rpcProviderRegistry, - final boolean isStatisticsPollingEnabled, - final HashedWheelTimer hashedWheelTimer, - final ConvertorExecutor convertorExecutor) { - Preconditions.checkArgument(rpcProviderRegistry != null); - this.convertorExecutor = convertorExecutor; - this.controlServiceRegistration = Preconditions.checkNotNull(rpcProviderRegistry.addRpcImplementation( - StatisticsManagerControlService.class, this)); - this.isStatisticsPollingEnabled = isStatisticsPollingEnabled; - this.hashedWheelTimer = hashedWheelTimer; - } - - @Override - public void onDeviceContextLevelUp(final DeviceInfo deviceInfo, final LifecycleService lifecycleService) throws Exception { - - final StatisticsContext statisticsContext = new StatisticsContextImpl(deviceInfo, isStatisticsPollingEnabled, lifecycleService, convertorExecutor, this); - Verify.verify(contexts.putIfAbsent(deviceInfo, statisticsContext) == null, "StatisticsCtx still not closed for Node {}", deviceInfo.getLOGValue()); - lifecycleService.setStatContext(statisticsContext); - deviceInitPhaseHandler.onDeviceContextLevelUp(deviceInfo, lifecycleService); - } - @VisibleForTesting - void pollStatistics(final DeviceState deviceState, - final StatisticsContext statisticsContext, - final TimeCounter timeCounter, - final DeviceInfo deviceInfo) { - - if (!statisticsContext.isSchedulingEnabled()) { - LOG.debug("Disabled statistics scheduling for device: {}", deviceInfo.getNodeId().getValue()); - return; - } - - LOG.debug("POLLING ALL STATISTICS for device: {}", deviceInfo.getNodeId()); - timeCounter.markStart(); - final ListenableFuture deviceStatisticsCollectionFuture = statisticsContext.gatherDynamicData(); - Futures.addCallback(deviceStatisticsCollectionFuture, new FutureCallback() { - @Override - public void onSuccess(final Boolean o) { - timeCounter.addTimeMark(); - calculateTimerDelay(timeCounter); - scheduleNextPolling(deviceState, deviceInfo, statisticsContext, timeCounter); - } - - @Override - public void onFailure(@Nonnull final Throwable throwable) { - timeCounter.addTimeMark(); - LOG.warn("Statistics gathering for single node was not successful: {}", throwable.getMessage()); - LOG.trace("Statistics gathering for single node was not successful.. ", throwable); - calculateTimerDelay(timeCounter); - if (throwable instanceof CancellationException) { - /* This often happens when something wrong with akka or DS, so closing connection will help to restart device **/ - contexts.get(deviceInfo).getLifecycleService().closeConnection(); - } else { - scheduleNextPolling(deviceState, deviceInfo, statisticsContext, timeCounter); - } - } - }); + final ConcurrentMap contexts = new ConcurrentHashMap<>(); - final long averageTime = TimeUnit.MILLISECONDS.toSeconds(timeCounter.getAverageTimeBetweenMarks()); - final long STATS_TIMEOUT_SEC = averageTime > 0 ? 3 * averageTime : DEFAULT_STATS_TIMEOUT_SEC; - final TimerTask timerTask = timeout -> { - if (!deviceStatisticsCollectionFuture.isDone()) { - LOG.info("Statistics collection for node {} still in progress even after {} secs", deviceInfo.getLOGValue(), STATS_TIMEOUT_SEC); - deviceStatisticsCollectionFuture.cancel(true); - } - }; - - hashedWheelTimer.newTimeout(timerTask, STATS_TIMEOUT_SEC, TimeUnit.SECONDS); - } - - private void scheduleNextPolling(final DeviceState deviceState, - final DeviceInfo deviceInfo, - final StatisticsContext statisticsContext, - final TimeCounter timeCounter) { - LOG.debug("SCHEDULING NEXT STATISTICS POLLING for device: {}", deviceInfo.getNodeId()); - if (!isStatisticsPollingEnabled) { - final Timeout pollTimeout = hashedWheelTimer.newTimeout( - timeout -> pollStatistics( - deviceState, - statisticsContext, - timeCounter, - deviceInfo), - currentTimerDelay, - TimeUnit.MILLISECONDS); - statisticsContext.setPollTimeout(pollTimeout); - } + private final OpenflowProviderConfig config; + private final ConvertorExecutor converterExecutor; + private final Executor executor; + private final Semaphore workModeGuard = new Semaphore(1, true); + private final Registration controlServiceRegistration; + private final StatisticsWorkMode workMode = StatisticsWorkMode.COLLECTALL; + private boolean isStatisticsFullyDisabled; + + public StatisticsManagerImpl(@NonNull final OpenflowProviderConfig config, + @NonNull final RpcProviderService rpcProviderRegistry, + final ConvertorExecutor convertorExecutor, + @NonNull final Executor executor) { + this.config = config; + this.executor = executor; + converterExecutor = convertorExecutor; + controlServiceRegistration = rpcProviderRegistry.registerRpcImplementations( + ImmutableClassToInstanceMap.>builder() + .put(GetStatisticsWorkMode.class, this::getStatisticsWorkMode) + .put(ChangeStatisticsWorkMode.class, this::changeStatisticsWorkMode) + .build()); } @VisibleForTesting - void calculateTimerDelay(final TimeCounter timeCounter) { - final long averageStatisticsGatheringTime = timeCounter.getAverageTimeBetweenMarks(); - if (averageStatisticsGatheringTime > currentTimerDelay) { - currentTimerDelay *= 2; - if (currentTimerDelay > maximumTimerDelay) { - currentTimerDelay = maximumTimerDelay; - } - } else { - if (currentTimerDelay > basicTimerDelay) { - currentTimerDelay /= 2; - } else { - currentTimerDelay = basicTimerDelay; - } - } + ListenableFuture> getStatisticsWorkMode( + final GetStatisticsWorkModeInput input) { + return RpcResultBuilder.success(new GetStatisticsWorkModeOutputBuilder() + .setMode(workMode) + .build()).buildFuture(); } @VisibleForTesting - static long getCurrentTimerDelay() { - return currentTimerDelay; - } - - @Override - public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) { - final StatisticsContext statisticsContext = contexts.remove(deviceInfo); - if (null != statisticsContext) { - LOG.debug("Removing device context from stack. No more statistics gathering for device: {}", deviceInfo.getLOGValue()); - statisticsContext.close(); - } - deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo); - } - - @Override - public Future> getStatisticsWorkMode() { - final GetStatisticsWorkModeOutputBuilder smModeOutputBld = new GetStatisticsWorkModeOutputBuilder(); - smModeOutputBld.setMode(workMode); - return RpcResultBuilder.success(smModeOutputBld.build()).buildFuture(); - } - - @Override - public Future> changeStatisticsWorkMode(ChangeStatisticsWorkModeInput input) { - final Future> result; - // acquire exclusive access + ListenableFuture> changeStatisticsWorkMode( + final ChangeStatisticsWorkModeInput input) { if (workModeGuard.tryAcquire()) { final StatisticsWorkMode targetWorkMode = input.getMode(); - if (!workMode.equals(targetWorkMode)) { - isStatisticsPollingEnabled = StatisticsWorkMode.FULLYDISABLED.equals(targetWorkMode); - // iterate through stats-ctx: propagate mode - for (Map.Entry entry : contexts.entrySet()) { - final DeviceInfo deviceInfo = entry.getKey(); - final StatisticsContext statisticsContext = entry.getValue(); - final DeviceContext deviceContext = statisticsContext.getLifecycleService().getDeviceContext(); - switch (targetWorkMode) { - case COLLECTALL: - scheduleNextPolling(deviceContext.getDeviceState(), deviceInfo, statisticsContext, new TimeCounter()); - for (final ItemLifeCycleSource lifeCycleSource : deviceContext.getItemLifeCycleSourceRegistry().getLifeCycleSources()) { - lifeCycleSource.setItemLifecycleListener(null); - } - break; - case FULLYDISABLED: - final Optional pollTimeout = statisticsContext.getPollTimeout(); - if (pollTimeout.isPresent()) { - pollTimeout.get().cancel(); - } - for (final ItemLifeCycleSource lifeCycleSource : deviceContext.getItemLifeCycleSourceRegistry().getLifeCycleSources()) { - lifeCycleSource.setItemLifecycleListener(statisticsContext.getItemLifeCycleListener()); - } - break; - default: - LOG.warn("Statistics work mode not supported: {}", targetWorkMode); - } + isStatisticsFullyDisabled = StatisticsWorkMode.FULLYDISABLED.equals(targetWorkMode); + + contexts.values().forEach(context -> { + switch (targetWorkMode) { + case COLLECTALL: + context.enableGathering(); + break; + case FULLYDISABLED: + context.disableGathering(); + break; + default: + LOG.warn("Statistics work mode not supported: {}", targetWorkMode); } - workMode = targetWorkMode; - } + }); + workModeGuard.release(); - result = RpcResultBuilder.success().buildFuture(); - } else { - result = RpcResultBuilder.failed() - .withError(RpcError.ErrorType.APPLICATION, "mode change already in progress") - .buildFuture(); + return RpcResultBuilder.success().buildFuture(); } - return result; + + return RpcResultBuilder.failed() + .withError(ErrorType.APPLICATION, "Statistics work mode change is already in progress") + .buildFuture(); } @Override - public void startScheduling(final DeviceInfo deviceInfo) { - if (isStatisticsPollingEnabled) { - LOG.info("Statistics are shut down for device: {}", deviceInfo.getNodeId()); - return; - } - - final StatisticsContext statisticsContext = contexts.get(deviceInfo); - - if (statisticsContext == null) { - LOG.warn("Statistics context not found for device: {}", deviceInfo.getNodeId()); - return; - } - - if (statisticsContext.isSchedulingEnabled()) { - LOG.debug("Statistics scheduling is already enabled for device: {}", deviceInfo.getNodeId()); - return; - } - - LOG.info("Scheduling statistics poll for device: {}", deviceInfo.getNodeId()); - - statisticsContext.setSchedulingEnabled(true); - final DeviceState deviceState = contexts.get(deviceInfo).getLifecycleService().getDeviceContext().getDeviceState(); - scheduleNextPolling(deviceState, deviceInfo, statisticsContext, new TimeCounter()); + public StatisticsContext createContext(@NonNull final DeviceContext deviceContext, + final boolean useReconciliationFramework) { + final MultipartWriterProvider statisticsWriterProvider = MultipartWriterProviderFactory + .createDefaultProvider(deviceContext); + + final StatisticsContext statisticsContext = new StatisticsContextImpl<>( + deviceContext, + converterExecutor, + statisticsWriterProvider, + executor, + config, + !isStatisticsFullyDisabled && config.getIsStatisticsPollingOn(), + useReconciliationFramework); + + contexts.put(deviceContext.getDeviceInfo(), statisticsContext); + return statisticsContext; } @Override - public void stopScheduling(final DeviceInfo deviceInfo) { - LOG.debug("Stopping statistics scheduling for device: {}", deviceInfo.getNodeId()); - final StatisticsContext statisticsContext = contexts.get(deviceInfo); - - if (statisticsContext == null) { - LOG.warn("Statistics context not found for device: {}", deviceInfo.getNodeId()); - return; - } - - statisticsContext.setSchedulingEnabled(false); + public void onDeviceRemoved(final DeviceInfo deviceInfo) { + contexts.remove(deviceInfo); + LOG.debug("Statistics context removed for node {}", deviceInfo); } @Override public void close() { - if (controlServiceRegistration != null) { - controlServiceRegistration.close(); - controlServiceRegistration = null; - } - for (final Iterator iterator = Iterators.consumingIterator(contexts.values().iterator()); - iterator.hasNext();) { - iterator.next().close(); - } - } + isStatisticsFullyDisabled = true; + controlServiceRegistration.close(); - @Override - public void setDeviceTerminationPhaseHandler(final DeviceTerminationPhaseHandler handler) { - this.deviceTerminPhaseHandler = handler; - } + for (StatisticsContext context : contexts.values()) { + context.close(); + } - @Override - public T gainContext(DeviceInfo deviceInfo) { - return (T) contexts.get(deviceInfo); + contexts.clear(); } }