import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timeout;
+import io.netty.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
-import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceSynchronizedHandler;
+import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
import org.slf4j.Logger;
public class StatisticsManagerImpl implements StatisticsManager {
private static final Logger LOG = LoggerFactory.getLogger(StatisticsManagerImpl.class);
- private DeviceSynchronizedHandler deviceSynchronizedHandler;
- public StatisticsManagerImpl() {
+ private DeviceInitializationPhaseHandler deviceInitPhaseHandler;
+ private HashedWheelTimer hashedWheelTimer;
+
+ private ConcurrentHashMap<DeviceContext, StatisticsContext> contexts = new ConcurrentHashMap();
+
+ @Override
+ public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
+ deviceInitPhaseHandler = handler;
}
@Override
- public void deviceConnected(final DeviceContext deviceContext) {
- final StatisticsContext statisticsContext = new StatisticsContextImpl(deviceContext, null);
+ public void onDeviceContextLevelUp(final DeviceContext deviceContext) {
+
+ if (null == hashedWheelTimer) {
+ LOG.trace("This is first device that delivered timer. Starting statistics polling immediately.");
+ hashedWheelTimer = deviceContext.getTimer();
+ pollStatistics();
+ }
+
+ final StatisticsContext statisticsContext = new StatisticsContextImpl(deviceContext);
final ListenableFuture<Void> weHaveDynamicData = statisticsContext.gatherDynamicData();
Futures.addCallback(weHaveDynamicData, new FutureCallback<Void>() {
@Override
public void onSuccess(final Void aVoid) {
- deviceSynchronizedHandler.deviceConnected(deviceContext);
+ // wake up RPC registration
+ LOG.trace("Device dynamic info collected. Going to announce raise to next level.");
+ contexts.put(deviceContext, statisticsContext);
+ deviceContext.getDeviceState().setDeviceSynchronized(true);
+ deviceInitPhaseHandler.onDeviceContextLevelUp(deviceContext);
}
@Override
public void onFailure(final Throwable throwable) {
- LOG.error("Statistics manager was not able to collect dynamic info for device {}", deviceContext.getDeviceState().getNodeId());
+ LOG.warn("Statistics manager was not able to collect dynamic info for device {}", deviceContext.getDeviceState().getNodeId(), throwable);
}
});
-
}
- @Override
- public void addRequestDeviceSynchronizedHandler(final DeviceSynchronizedHandler deviceSynchronizedHandler) {
- this.deviceSynchronizedHandler = deviceSynchronizedHandler;
- }
+ private void pollStatistics() {
+ for (final StatisticsContext statisticsContext : contexts.values()) {
+ ListenableFuture deviceStatisticsCollectionFuture = statisticsContext.gatherDynamicData();
+ Futures.addCallback(deviceStatisticsCollectionFuture, new FutureCallback() {
+ @Override
+ public void onSuccess(final Object o) {
+ //nothing to do here
+ }
+ @Override
+ public void onFailure(final Throwable throwable) {
+ LOG.info("Statistics gathering for single node was not successful.");
+ }
+ });
+ }
+ if (null != hashedWheelTimer) {
+ hashedWheelTimer.newTimeout(new TimerTask() {
+ @Override
+ public void run(final Timeout timeout) throws Exception {
+ pollStatistics();
+ }
+ }, 3000, TimeUnit.MILLISECONDS);
+ }
+ }
}