correct way of group statistics processing
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / statistics / StatisticsManagerImpl.java
index 427f1cf5f243e2f225c0526d1efa832661cf1beb..4af8757e91eaff18033904879908c17a18603361 100644 (file)
@@ -11,8 +11,13 @@ package org.opendaylight.openflowplugin.impl.statistics;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timeout;
+import io.netty.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
-import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceSynchronizedHandler;
+import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
 import org.slf4j.Logger;
@@ -24,33 +29,68 @@ import org.slf4j.LoggerFactory;
 public class StatisticsManagerImpl implements StatisticsManager {
 
     private static final Logger LOG = LoggerFactory.getLogger(StatisticsManagerImpl.class);
-    private DeviceSynchronizedHandler deviceSynchronizedHandler;
 
-    public StatisticsManagerImpl() {
+    private DeviceInitializationPhaseHandler deviceInitPhaseHandler;
 
+    private HashedWheelTimer hashedWheelTimer;
+
+    private ConcurrentHashMap<DeviceContext, StatisticsContext> contexts = new ConcurrentHashMap();
+
+    @Override
+    public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
+        deviceInitPhaseHandler = handler;
     }
 
     @Override
-    public void deviceConnected(final DeviceContext deviceContext) {
-        final StatisticsContext statisticsContext = new StatisticsContextImpl(deviceContext, null);
+    public void onDeviceContextLevelUp(final DeviceContext deviceContext) {
+
+        if (null == hashedWheelTimer) {
+            LOG.trace("This is first device that delivered timer. Starting statistics polling immediately.");
+            hashedWheelTimer = deviceContext.getTimer();
+            pollStatistics();
+        }
+
+        final StatisticsContext statisticsContext = new StatisticsContextImpl(deviceContext);
         final ListenableFuture<Void> weHaveDynamicData = statisticsContext.gatherDynamicData();
         Futures.addCallback(weHaveDynamicData, new FutureCallback<Void>() {
             @Override
             public void onSuccess(final Void aVoid) {
-                deviceSynchronizedHandler.deviceConnected(deviceContext);
+                // wake up RPC registration
+                LOG.trace("Device dynamic info collected. Going to announce raise to next level.");
+                contexts.put(deviceContext, statisticsContext);
+                deviceContext.getDeviceState().setDeviceSynchronized(true);
+                deviceInitPhaseHandler.onDeviceContextLevelUp(deviceContext);
             }
 
             @Override
             public void onFailure(final Throwable throwable) {
-                LOG.error("Statistics manager was not able to collect dynamic info for device {}", deviceContext.getDeviceState().getNodeId());
+                LOG.warn("Statistics manager was not able to collect dynamic info for device {}", deviceContext.getDeviceState().getNodeId(), throwable);
             }
         });
-
     }
 
-    @Override
-    public void addRequestDeviceSynchronizedHandler(final DeviceSynchronizedHandler deviceSynchronizedHandler) {
-        this.deviceSynchronizedHandler = deviceSynchronizedHandler;
-    }
+    private void pollStatistics() {
+        for (final StatisticsContext statisticsContext : contexts.values()) {
+            ListenableFuture deviceStatisticsCollectionFuture = statisticsContext.gatherDynamicData();
+            Futures.addCallback(deviceStatisticsCollectionFuture, new FutureCallback() {
+                @Override
+                public void onSuccess(final Object o) {
+                    //nothing to do here
+                }
 
+                @Override
+                public void onFailure(final Throwable throwable) {
+                    LOG.info("Statistics gathering for single node was not successful.");
+                }
+            });
+        }
+        if (null != hashedWheelTimer) {
+            hashedWheelTimer.newTimeout(new TimerTask() {
+                @Override
+                public void run(final Timeout timeout) throws Exception {
+                    pollStatistics();
+                }
+            }, 3000, TimeUnit.MILLISECONDS);
+        }
+    }
 }