Fix DeviceFlowRegistry filling
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / LifecycleConductorImpl.java
index c0ea15a01bcf8e2a30e88d09b984d1f8ca37681f..af2efaf21f8ea67b4589fd2b959614ea8cf6d8ba 100644 (file)
@@ -34,6 +34,7 @@ import org.opendaylight.openflowplugin.api.openflow.lifecycle.DeviceContextChang
 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleConductor;
 import org.opendaylight.openflowplugin.api.openflow.lifecycle.RoleChangeListener;
 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ServiceChangeListener;
+import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext;
 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager;
 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
@@ -164,44 +165,7 @@ final class LifecycleConductorImpl implements LifecycleConductor, RoleChangeList
 
         if (OfpRole.BECOMEMASTER.equals(newRole)) {
             logText = "Start";
-
-            // Fill device flow registry with flows from datastore
-            final ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill =
-                    deviceContext.getDeviceFlowRegistry().fill(deviceInfo.getNodeInstanceIdentifier());
-
-            // Start statistics scheduling only after we finished initializing device flow registry
-            Futures.addCallback(deviceFlowRegistryFill, new FutureCallback<List<Optional<FlowCapableNode>>>() {
-                @Override
-                public void onSuccess(@Nullable List<Optional<FlowCapableNode>> result) {
-                    if (LOG.isDebugEnabled()) {
-                        // Count all flows we read from datastore for debugging purposes.
-                        // This number do not always represent how many flows were actually added
-                        // to DeviceFlowRegistry, because of possible duplicates.
-                        long flowCount = Optional.fromNullable(result).asSet().stream()
-                                .flatMap(Collection::stream)
-                                .flatMap(flowCapableNodeOptional -> flowCapableNodeOptional.asSet().stream())
-                                .flatMap(flowCapableNode -> flowCapableNode.getTable().stream())
-                                .flatMap(table -> table.getFlow().stream())
-                                .count();
-
-                        LOG.debug("Finished filling flow registry with {} flows for node: {}", flowCount, deviceInfo.getNodeId());
-                    }
-
-                    statisticsManager.startScheduling(deviceInfo);
-                }
-
-                @Override
-                public void onFailure(Throwable t) {
-                    // If we manually cancelled this future, do not start scheduling of statistics
-                    if (deviceFlowRegistryFill.isCancelled()) {
-                        LOG.debug("Cancelled filling flow registry with flows for node: {}", deviceInfo.getNodeId());
-                    } else {
-                        LOG.warn("Failed filling flow registry with flows for node: {} with exception: {}", deviceInfo.getNodeId(), t);
-                        statisticsManager.startScheduling(deviceInfo);
-                    }
-                }
-            });
-
+            fillDeviceFlowRegistry(deviceInfo, deviceContext.getDeviceFlowRegistry());
             MdSalRegistrationUtils.registerServices(rpcContext, deviceContext, this.extensionConverterProvider);
 
             if (rpcContext.isStatisticsRpcEnabled()) {
@@ -210,9 +174,6 @@ final class LifecycleConductorImpl implements LifecycleConductor, RoleChangeList
                         deviceContext,
                         notificationPublishService);
             }
-
-            // Fill flow registry with flows found in operational and config datastore
-            deviceContext.getDeviceFlowRegistry().fill(deviceInfo.getNodeInstanceIdentifier());
         } else {
             logText = "Stopp";
             statisticsManager.stopScheduling(deviceInfo);
@@ -243,6 +204,44 @@ final class LifecycleConductorImpl implements LifecycleConductor, RoleChangeList
         });
     }
 
+    private void fillDeviceFlowRegistry(final DeviceInfo deviceInfo, final DeviceFlowRegistry deviceFlowRegistry) {
+        // Fill device flow registry with flows from datastore
+        final ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill = deviceFlowRegistry.fill();
+
+        // Start statistics scheduling only after we finished initializing device flow registry
+        Futures.addCallback(deviceFlowRegistryFill, new FutureCallback<List<Optional<FlowCapableNode>>>() {
+            @Override
+            public void onSuccess(@Nullable List<Optional<FlowCapableNode>> result) {
+                if (LOG.isDebugEnabled()) {
+                    // Count all flows we read from datastore for debugging purposes.
+                    // This number do not always represent how many flows were actually added
+                    // to DeviceFlowRegistry, because of possible duplicates.
+                    long flowCount = Optional.fromNullable(result).asSet().stream()
+                            .flatMap(Collection::stream)
+                            .flatMap(flowCapableNodeOptional -> flowCapableNodeOptional.asSet().stream())
+                            .flatMap(flowCapableNode -> flowCapableNode.getTable().stream())
+                            .flatMap(table -> table.getFlow().stream())
+                            .count();
+
+                    LOG.debug("Finished filling flow registry with {} flows for node: {}", flowCount, deviceInfo.getNodeId());
+                }
+
+                statisticsManager.startScheduling(deviceInfo);
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                // If we manually cancelled this future, do not start scheduling of statistics
+                if (deviceFlowRegistryFill.isCancelled()) {
+                    LOG.debug("Cancelled filling flow registry with flows for node: {}", deviceInfo.getNodeId());
+                } else {
+                    LOG.warn("Failed filling flow registry with flows for node: {} with exception: {}", deviceInfo.getNodeId(), t);
+                    statisticsManager.startScheduling(deviceInfo);
+                }
+            }
+        });
+    }
+
     public MessageIntelligenceAgency getMessageIntelligenceAgency() {
         return messageIntelligenceAgency;
     }