package org.opendaylight.openflowplugin.impl.statistics;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
+import com.google.common.collect.Iterators;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
+import java.util.Iterator;
import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import javax.annotation.CheckForNull;
+import javax.annotation.Nonnull;
import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
-import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
+import org.opendaylight.openflowplugin.api.openflow.OFPContext;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
+import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
+import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
+import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler;
+import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleConductor;
import org.opendaylight.openflowplugin.api.openflow.rpc.ItemLifeCycleSource;
import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
+import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.sm.control.rev150812.ChangeStatisticsWorkModeInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.sm.control.rev150812.GetStatisticsWorkModeOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.sm.control.rev150812.GetStatisticsWorkModeOutputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.sm.control.rev150812.StatisticsManagerControlService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.sm.control.rev150812.StatisticsWorkMode;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole;
import org.opendaylight.yangtools.yang.common.RpcError;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/**
- * Created by Martin Bobak <mbobak@cisco.com> on 1.4.2015.
- */
public class StatisticsManagerImpl implements StatisticsManager, StatisticsManagerControlService {
private static final Logger LOG = LoggerFactory.getLogger(StatisticsManagerImpl.class);
- private final RpcProviderRegistry rpcProviderRegistry;
- private DeviceInitializationPhaseHandler deviceInitPhaseHandler;
+ private static final long DEFAULT_STATS_TIMEOUT_SEC = 50L;
+ private final ConvertorExecutor convertorExecutor;
- private HashedWheelTimer hashedWheelTimer;
+ private DeviceInitializationPhaseHandler deviceInitPhaseHandler;
+ private DeviceTerminationPhaseHandler deviceTerminPhaseHandler;
- private final ConcurrentHashMap<DeviceContext, StatisticsContext> contexts = new ConcurrentHashMap<>();
+ private final ConcurrentMap<DeviceInfo, StatisticsContext> contexts = new ConcurrentHashMap<>();
private static final long basicTimerDelay = 3000;
private static long currentTimerDelay = basicTimerDelay;
- private static long maximumTimerDelay = 900000; //wait max 15 minutes for next statistics
+ private static final long maximumTimerDelay = 900000; //wait max 15 minutes for next statistics
private StatisticsWorkMode workMode = StatisticsWorkMode.COLLECTALL;
- private Semaphore workModeGuard = new Semaphore(1, true);
+ private final Semaphore workModeGuard = new Semaphore(1, true);
private boolean shuttingDownStatisticsPolling;
private BindingAwareBroker.RpcRegistration<StatisticsManagerControlService> controlServiceRegistration;
+ private final LifecycleConductor conductor;
+
@Override
public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
deviceInitPhaseHandler = handler;
}
- public StatisticsManagerImpl(RpcProviderRegistry rpcProviderRegistry) {
- this.rpcProviderRegistry = rpcProviderRegistry;
- controlServiceRegistration = rpcProviderRegistry.addRpcImplementation(StatisticsManagerControlService.class, this);
- }
-
- public StatisticsManagerImpl(RpcProviderRegistry rpcProviderRegistry, final boolean shuttingDownStatisticsPolling) {
- this(rpcProviderRegistry);
+ public StatisticsManagerImpl(@CheckForNull final RpcProviderRegistry rpcProviderRegistry,
+ final boolean shuttingDownStatisticsPolling,
+ final LifecycleConductor lifecycleConductor,
+ final ConvertorExecutor convertorExecutor) {
+ this.convertorExecutor = convertorExecutor;
+ Preconditions.checkArgument(rpcProviderRegistry != null);
+ this.controlServiceRegistration = Preconditions.checkNotNull(rpcProviderRegistry.addRpcImplementation(
+ StatisticsManagerControlService.class, this));
this.shuttingDownStatisticsPolling = shuttingDownStatisticsPolling;
+ this.conductor = lifecycleConductor;
}
@Override
- public void onDeviceContextLevelUp(final DeviceContext deviceContext) {
- LOG.debug("Node:{}, deviceContext.getDeviceState().getRole():{}", deviceContext.getDeviceState().getNodeId(),
- deviceContext.getDeviceState().getRole());
- if (deviceContext.getDeviceState().getRole() == OfpRole.BECOMESLAVE) {
- // if slave, we dont poll for statistics and jump to rpc initialization
- LOG.info("Skipping Statistics for slave role for node:{}", deviceContext.getDeviceState().getNodeId());
- deviceInitPhaseHandler.onDeviceContextLevelUp(deviceContext);
- return;
- }
+ public void onDeviceContextLevelUp(final DeviceInfo deviceInfo) throws Exception {
- if (null == hashedWheelTimer) {
- LOG.trace("This is first device that delivered timer. Starting statistics polling immediately.");
- hashedWheelTimer = deviceContext.getTimer();
- }
+ final StatisticsContext statisticsContext = new StatisticsContextImpl(deviceInfo, shuttingDownStatisticsPolling, conductor, convertorExecutor);
+ Verify.verify(contexts.putIfAbsent(deviceInfo, statisticsContext) == null, "StatisticsCtx still not closed for Node {}", deviceInfo.getNodeId());
- LOG.info("Starting Statistics for master role for node:{}", deviceContext.getDeviceState().getNodeId());
+ deviceInitPhaseHandler.onDeviceContextLevelUp(deviceInfo);
+ }
- final StatisticsContext statisticsContext = new StatisticsContextImpl(deviceContext);
- deviceContext.addDeviceContextClosedHandler(this);
- final ListenableFuture<Boolean> weHaveDynamicData = statisticsContext.gatherDynamicData();
- Futures.addCallback(weHaveDynamicData, new FutureCallback<Boolean>() {
- @Override
- public void onSuccess(final Boolean statisticsGathered) {
- if (statisticsGathered) {
- //there are some statistics on device worth gathering
- contexts.put(deviceContext, statisticsContext);
- final TimeCounter timeCounter = new TimeCounter();
- scheduleNextPolling(deviceContext, statisticsContext, timeCounter);
- LOG.trace("Device dynamic info collecting done. Going to announce raise to next level.");
- deviceInitPhaseHandler.onDeviceContextLevelUp(deviceContext);
- deviceContext.getDeviceState().setDeviceSynchronized(true);
- } else {
- final String deviceAdress = deviceContext.getPrimaryConnectionContext().getConnectionAdapter().getRemoteAddress().toString();
- try {
- deviceContext.close();
- } catch (Exception e) {
- LOG.info("Statistics for device {} could not be gathered. Closing its device context.", deviceAdress);
- }
- }
- }
+ @VisibleForTesting
+ void pollStatistics(final DeviceState deviceState,
+ final StatisticsContext statisticsContext,
+ final TimeCounter timeCounter,
+ final DeviceInfo deviceInfo) {
- @Override
- public void onFailure(final Throwable throwable) {
- LOG.warn("Statistics manager was not able to collect dynamic info for device.", deviceContext.getDeviceState().getNodeId(), throwable);
- try {
- deviceContext.close();
- } catch (Exception e) {
- LOG.warn("Error closing device context.", e);
- }
- }
- });
- }
+ if (!statisticsContext.isSchedulingEnabled()) {
+ LOG.debug("Disabling statistics scheduling for device: {}", deviceInfo.getNodeId());
+ return;
+ }
+
+ if (!deviceState.isValid()) {
+ LOG.debug("Session is not valid for device: {}", deviceInfo.getNodeId());
+ return;
+ }
- private void pollStatistics(final DeviceContext deviceContext,
- final StatisticsContext statisticsContext,
- final TimeCounter timeCounter) {
- LOG.debug("POLLING ALL STATS for device: {}", deviceContext.getDeviceState().getNodeId().getValue());
+ if (!deviceState.isStatisticsPollingEnabled()) {
+ LOG.debug("Statistics polling is currently disabled for device: {}", deviceInfo.getNodeId());
+ scheduleNextPolling(deviceState, deviceInfo, statisticsContext, timeCounter);
+ return;
+ }
+
+ LOG.debug("POLLING ALL STATISTICS for device: {}", deviceInfo.getNodeId());
timeCounter.markStart();
- ListenableFuture<Boolean> deviceStatisticsCollectionFuture = statisticsContext.gatherDynamicData();
+ final ListenableFuture<Boolean> deviceStatisticsCollectionFuture = statisticsContext.gatherDynamicData();
Futures.addCallback(deviceStatisticsCollectionFuture, new FutureCallback<Boolean>() {
@Override
public void onSuccess(final Boolean o) {
timeCounter.addTimeMark();
calculateTimerDelay(timeCounter);
- scheduleNextPolling(deviceContext, statisticsContext, timeCounter);
+ scheduleNextPolling(deviceState, deviceInfo, statisticsContext, timeCounter);
}
@Override
- public void onFailure(final Throwable throwable) {
+ public void onFailure(@Nonnull final Throwable throwable) {
timeCounter.addTimeMark();
- LOG.info("Statistics gathering for single node was not successful: {}", throwable.getMessage());
- LOG.debug("Statistics gathering for single node was not successful.. ", throwable);
+ LOG.warn("Statistics gathering for single node was not successful: {}", throwable.getMessage());
+ LOG.trace("Statistics gathering for single node was not successful.. ", throwable);
calculateTimerDelay(timeCounter);
- scheduleNextPolling(deviceContext, statisticsContext, timeCounter);
+ if (throwable instanceof CancellationException) {
+ /* This often happens when something wrong with akka or DS, so closing connection will help to restart device **/
+ conductor.closeConnection(deviceInfo);
+ } else {
+ scheduleNextPolling(deviceState, deviceInfo, statisticsContext, timeCounter);
+ }
}
});
- final long STATS_TIMEOUT_SEC = 20L;
- try {
- deviceStatisticsCollectionFuture.get(STATS_TIMEOUT_SEC, TimeUnit.SECONDS);
- } catch (InterruptedException | ExecutionException e) {
- LOG.warn("Statistics collection for node {} failed", deviceContext.getDeviceState().getNodeId(), e);
- } catch (TimeoutException e) {
- LOG.info("Statistics collection for node {} still in progress even after {} secs", deviceContext.getDeviceState().getNodeId(), STATS_TIMEOUT_SEC);
- }
+ final long averageTime = TimeUnit.MILLISECONDS.toSeconds(timeCounter.getAverageTimeBetweenMarks());
+ final long STATS_TIMEOUT_SEC = averageTime > 0 ? 3 * averageTime : DEFAULT_STATS_TIMEOUT_SEC;
+ final TimerTask timerTask = timeout -> {
+ if (!deviceStatisticsCollectionFuture.isDone()) {
+ LOG.info("Statistics collection for node {} still in progress even after {} secs", deviceInfo.getNodeId(), STATS_TIMEOUT_SEC);
+ deviceStatisticsCollectionFuture.cancel(true);
+ }
+ };
+
+ conductor.newTimeout(timerTask, STATS_TIMEOUT_SEC, TimeUnit.SECONDS);
}
- private void scheduleNextPolling(final DeviceContext deviceContext,
+ private void scheduleNextPolling(final DeviceState deviceState,
+ final DeviceInfo deviceInfo,
final StatisticsContext statisticsContext,
final TimeCounter timeCounter) {
- if (null != hashedWheelTimer) {
- LOG.debug("SCHEDULING NEXT STATS POLLING for device: {}", deviceContext.getDeviceState().getNodeId().getValue());
- if (!shuttingDownStatisticsPolling) {
- Timeout pollTimeout = hashedWheelTimer.newTimeout(new TimerTask() {
- @Override
- public void run(final Timeout timeout) throws Exception {
- pollStatistics(deviceContext, statisticsContext, timeCounter);
- }
- }, currentTimerDelay, TimeUnit.MILLISECONDS);
- statisticsContext.setPollTimeout(pollTimeout);
- }
- } else {
- LOG.debug("#!NOT SCHEDULING NEXT STATS POLLING for device: {}", deviceContext.getDeviceState().getNodeId().getValue());
+ LOG.debug("SCHEDULING NEXT STATISTICS POLLING for device: {}", deviceInfo.getNodeId());
+ if (!shuttingDownStatisticsPolling) {
+ final Timeout pollTimeout = conductor.newTimeout(timeout -> pollStatistics(deviceState, statisticsContext, timeCounter, deviceInfo), currentTimerDelay, TimeUnit.MILLISECONDS);
+ statisticsContext.setPollTimeout(pollTimeout);
}
}
@VisibleForTesting
- protected void calculateTimerDelay(final TimeCounter timeCounter) {
- long averageStatisticsGatheringTime = timeCounter.getAverageTimeBetweenMarks();
+ void calculateTimerDelay(final TimeCounter timeCounter) {
+ final long averageStatisticsGatheringTime = timeCounter.getAverageTimeBetweenMarks();
if (averageStatisticsGatheringTime > currentTimerDelay) {
currentTimerDelay *= 2;
if (currentTimerDelay > maximumTimerDelay) {
}
@VisibleForTesting
- protected static long getCurrentTimerDelay() {
+ static long getCurrentTimerDelay() {
return currentTimerDelay;
}
@Override
- public void onDeviceContextClosed(final DeviceContext deviceContext) {
- StatisticsContext statisticsContext = contexts.remove(deviceContext);
+ public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) {
+ final StatisticsContext statisticsContext = contexts.remove(deviceInfo);
if (null != statisticsContext) {
- LOG.trace("Removing device context from stack. No more statistics gathering for node {}", deviceContext.getDeviceState().getNodeId());
- try {
- statisticsContext.close();
- } catch (Exception e) {
- LOG.debug("Error closing statistic context for node {}.", deviceContext.getDeviceState().getNodeId());
- }
+ LOG.trace("Removing device context from stack. No more statistics gathering for device: {}", deviceInfo.getNodeId());
+ statisticsContext.close();
}
+ deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
}
@Override
public Future<RpcResult<GetStatisticsWorkModeOutput>> getStatisticsWorkMode() {
- GetStatisticsWorkModeOutputBuilder smModeOutputBld = new GetStatisticsWorkModeOutputBuilder();
+ final GetStatisticsWorkModeOutputBuilder smModeOutputBld = new GetStatisticsWorkModeOutputBuilder();
smModeOutputBld.setMode(workMode);
return RpcResultBuilder.success(smModeOutputBld.build()).buildFuture();
}
if (!workMode.equals(targetWorkMode)) {
shuttingDownStatisticsPolling = StatisticsWorkMode.FULLYDISABLED.equals(targetWorkMode);
// iterate through stats-ctx: propagate mode
- for (Map.Entry<DeviceContext, StatisticsContext> contextEntry : contexts.entrySet()) {
- final DeviceContext deviceContext = contextEntry.getKey();
- final StatisticsContext statisticsContext = contextEntry.getValue();
+ for (Map.Entry<DeviceInfo, StatisticsContext> entry : contexts.entrySet()) {
switch (targetWorkMode) {
case COLLECTALL:
- scheduleNextPolling(deviceContext, statisticsContext, new TimeCounter());
- for (ItemLifeCycleSource lifeCycleSource : deviceContext.getItemLifeCycleSourceRegistry().getLifeCycleSources()) {
+ scheduleNextPolling(conductor.getDeviceContext(entry.getKey()).getDeviceState(), entry.getKey(), entry.getValue(), new TimeCounter());
+ for (final ItemLifeCycleSource lifeCycleSource : conductor.getDeviceContext(entry.getKey()).getItemLifeCycleSourceRegistry().getLifeCycleSources()) {
lifeCycleSource.setItemLifecycleListener(null);
}
break;
case FULLYDISABLED:
- final Optional<Timeout> pollTimeout = statisticsContext.getPollTimeout();
+ final Optional<Timeout> pollTimeout = entry.getValue().getPollTimeout();
if (pollTimeout.isPresent()) {
pollTimeout.get().cancel();
}
- for (ItemLifeCycleSource lifeCycleSource : deviceContext.getItemLifeCycleSourceRegistry().getLifeCycleSources()) {
- lifeCycleSource.setItemLifecycleListener(statisticsContext.getItemLifeCycleListener());
+ for (final ItemLifeCycleSource lifeCycleSource : conductor.getDeviceContext(entry.getKey()).getItemLifeCycleSourceRegistry().getLifeCycleSources()) {
+ lifeCycleSource.setItemLifecycleListener(entry.getValue().getItemLifeCycleListener());
}
break;
default:
- LOG.warn("statistics work mode not supported: {}", targetWorkMode);
+ LOG.warn("Statistics work mode not supported: {}", targetWorkMode);
}
}
workMode = targetWorkMode;
return result;
}
+ @Override
+ public void startScheduling(final DeviceInfo deviceInfo) {
+ if (shuttingDownStatisticsPolling) {
+ LOG.info("Statistics are shut down for device: {}", deviceInfo.getNodeId());
+ return;
+ }
+
+ final StatisticsContext statisticsContext = contexts.get(deviceInfo);
+
+ if (statisticsContext == null) {
+ LOG.warn("Statistics context not found for device: {}", deviceInfo.getNodeId());
+ return;
+ }
+
+ if (statisticsContext.isSchedulingEnabled()) {
+ LOG.debug("Statistics scheduling is already enabled for device: {}", deviceInfo.getNodeId());
+ return;
+ }
+
+ LOG.info("Scheduling statistics poll for device: {}", deviceInfo.getNodeId());
+
+ statisticsContext.setSchedulingEnabled(true);
+ scheduleNextPolling(conductor.getDeviceContext(deviceInfo).getDeviceState(), deviceInfo, statisticsContext, new TimeCounter());
+ }
+
+ @Override
+ public void stopScheduling(final DeviceInfo deviceInfo) {
+ LOG.debug("Stopping statistics scheduling for device: {}", deviceInfo.getNodeId());
+ final StatisticsContext statisticsContext = contexts.get(deviceInfo);
+
+ if (statisticsContext == null) {
+ LOG.warn("Statistics context not found for device: {}", deviceInfo.getNodeId());
+ return;
+ }
+
+ statisticsContext.setSchedulingEnabled(false);
+ }
+
@Override
public void close() {
if (controlServiceRegistration != null) {
controlServiceRegistration.close();
controlServiceRegistration = null;
}
+ for (final Iterator<StatisticsContext> iterator = Iterators.consumingIterator(contexts.values().iterator());
+ iterator.hasNext();) {
+ iterator.next().close();
+ }
+ }
+
+ @Override
+ public void setDeviceTerminationPhaseHandler(final DeviceTerminationPhaseHandler handler) {
+ this.deviceTerminPhaseHandler = handler;
+ }
+
+ @Override
+ public <T extends OFPContext> T gainContext(DeviceInfo deviceInfo) {
+ return (T) contexts.get(deviceInfo);
}
}