import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timeout;
+import io.netty.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
-import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceSynchronizedHandler;
+import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
import org.slf4j.Logger;
public class StatisticsManagerImpl implements StatisticsManager {
private static final Logger LOG = LoggerFactory.getLogger(StatisticsManagerImpl.class);
- private DeviceSynchronizedHandler deviceSynchronizedHandler;
- public StatisticsManagerImpl() {
+ private DeviceInitializationPhaseHandler deviceInitPhaseHandler;
+ private HashedWheelTimer hashedWheelTimer;
+
+ private final ConcurrentHashMap<DeviceContext, StatisticsContext> contexts = new ConcurrentHashMap<>();
+
+
+ private static final long basicTimerDelay = 3000;
+ private static long currentTimerDelay = basicTimerDelay;
+ private static long maximumTimerDelay = 900000; //wait max 15 minutes for next statistics
+
+ @Override
+ public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
+ deviceInitPhaseHandler = handler;
}
@Override
- public void deviceConnected(final DeviceContext deviceContext) {
+ public void onDeviceContextLevelUp(final DeviceContext deviceContext) {
+
+ if (null == hashedWheelTimer) {
+ LOG.trace("This is first device that delivered timer. Starting statistics polling immediately.");
+ hashedWheelTimer = deviceContext.getTimer();
+ }
+
final StatisticsContext statisticsContext = new StatisticsContextImpl(deviceContext);
- final ListenableFuture<Void> weHaveDynamicData = statisticsContext.gatherDynamicData();
- Futures.addCallback(weHaveDynamicData, new FutureCallback<Void>() {
+ deviceContext.addDeviceContextClosedHandler(this);
+ final ListenableFuture<Boolean> weHaveDynamicData = statisticsContext.gatherDynamicData();
+ Futures.addCallback(weHaveDynamicData, new FutureCallback<Boolean>() {
+ @Override
+ public void onSuccess(final Boolean statisticsGathered) {
+ if (statisticsGathered.booleanValue()) {
+ //there are some statistics on device worth gathering
+ contexts.put(deviceContext, statisticsContext);
+ final TimeCounter timeCounter = new TimeCounter();
+ pollStatistics(deviceContext, statisticsContext, timeCounter);
+ }
+ LOG.trace("Device dynamic info collecting done. Going to announce raise to next level.");
+ deviceInitPhaseHandler.onDeviceContextLevelUp(deviceContext);
+ deviceContext.getDeviceState().setDeviceSynchronized(true);
+ }
+
+ @Override
+ public void onFailure(final Throwable throwable) {
+ LOG.warn("Statistics manager was not able to collect dynamic info for device.", deviceContext.getDeviceState().getNodeId(), throwable);
+ try {
+ deviceContext.close();
+ } catch (Exception e) {
+ LOG.warn("Error closing device context.", e);
+ }
+ }
+ });
+ }
+
+ private void pollStatistics(final DeviceContext deviceContext,
+ final StatisticsContext statisticsContext,
+ final TimeCounter timeCounter) {
+ timeCounter.markStart();
+ ListenableFuture<Boolean> deviceStatisticsCollectionFuture = statisticsContext.gatherDynamicData();
+ Futures.addCallback(deviceStatisticsCollectionFuture, new FutureCallback<Boolean>() {
@Override
- public void onSuccess(final Void aVoid) {
- deviceSynchronizedHandler.deviceConnected(deviceContext);
+ public void onSuccess(final Boolean o) {
+ timeCounter.addTimeMark();
+ calculateTimerDelay(timeCounter);
+ scheduleNextPolling(deviceContext, statisticsContext, timeCounter);
}
@Override
public void onFailure(final Throwable throwable) {
- LOG.error("Statistics manager was not able to collect dynamic info for device {}", deviceContext.getDeviceState().getNodeId());
+ timeCounter.addTimeMark();
+ LOG.info("Statistics gathering for single node was not successful: {}", throwable.getMessage());
+ LOG.debug("Statistics gathering for single node was not successful.. ", throwable);
+ if (ConnectionContext.CONNECTION_STATE.WORKING.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) {
+ calculateTimerDelay(timeCounter);
+ scheduleNextPolling(deviceContext, statisticsContext, timeCounter);
+ }
}
});
+ }
+ private void scheduleNextPolling(final DeviceContext deviceContext,
+ final StatisticsContext statisticsContext,
+ final TimeCounter timeCounter) {
+ if (null != hashedWheelTimer) {
+ hashedWheelTimer.newTimeout(new TimerTask() {
+ @Override
+ public void run(final Timeout timeout) throws Exception {
+ pollStatistics(deviceContext, statisticsContext, timeCounter);
+ }
+ }, currentTimerDelay, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ private void calculateTimerDelay(final TimeCounter timeCounter) {
+ long averageStatisticsGatheringTime = timeCounter.getAverageTimeBetweenMarks();
+ if (averageStatisticsGatheringTime > currentTimerDelay) {
+ currentTimerDelay *= 2;
+ if (currentTimerDelay > maximumTimerDelay) {
+ currentTimerDelay = maximumTimerDelay;
+ }
+ } else {
+ if (currentTimerDelay > basicTimerDelay) {
+ currentTimerDelay /= 2;
+ } else {
+ currentTimerDelay = basicTimerDelay;
+ }
+ }
}
@Override
- public void addRequestDeviceSynchronizedHandler(final DeviceSynchronizedHandler deviceSynchronizedHandler) {
- this.deviceSynchronizedHandler = deviceSynchronizedHandler;
+ public void onDeviceContextClosed(final DeviceContext deviceContext) {
+ StatisticsContext statisticsContext = contexts.remove(deviceContext);
+ if (null != statisticsContext) {
+ LOG.trace("Removing device context from stack. No more statistics gathering for node {}", deviceContext.getDeviceState().getNodeId());
+ try {
+ statisticsContext.close();
+ } catch (Exception e) {
+ LOG.debug("Error closing statistic context for node {}.", deviceContext.getDeviceState().getNodeId());
+ }
+ }
}
+ private final class TimeCounter {
+ private long beginningOfTime;
+ private long delta;
+ private int marksCount = 0;
+
+ public void markStart() {
+ beginningOfTime = System.nanoTime();
+ delta = 0;
+ marksCount = 0;
+ }
+
+ public void addTimeMark() {
+ delta += System.nanoTime() - beginningOfTime;
+ marksCount++;
+ }
+
+ public long getAverageTimeBetweenMarks() {
+ long average = 0;
+ if (marksCount > 0) {
+ average = delta / marksCount;
+ }
+ return TimeUnit.NANOSECONDS.toMillis(average);
+ }
+
+ }
}