- if (OfpRole.BECOMEMASTER.equals(newRole)) {
- statisticsManager.startScheduling(deviceContext.getPrimaryConnectionContext().getDeviceInfo());
- } else {
- statisticsManager.stopScheduling(deviceContext.getPrimaryConnectionContext().getDeviceInfo());
- }
+ }
+
+ private void fillDeviceFlowRegistry(final DeviceInfo deviceInfo, final DeviceFlowRegistry deviceFlowRegistry) {
+ // Fill device flow registry with flows from datastore
+ final ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill = deviceFlowRegistry.fill();
+
+ // Start statistics scheduling only after we finished initializing device flow registry
+ Futures.addCallback(deviceFlowRegistryFill, new FutureCallback<List<Optional<FlowCapableNode>>>() {
+ @Override
+ public void onSuccess(@Nullable List<Optional<FlowCapableNode>> result) {
+ if (LOG.isDebugEnabled()) {
+ // Count all flows we read from datastore for debugging purposes.
+ // This number do not always represent how many flows were actually added
+ // to DeviceFlowRegistry, because of possible duplicates.
+ long flowCount = Optional.fromNullable(result).asSet().stream()
+ .flatMap(Collection::stream)
+ .flatMap(flowCapableNodeOptional -> flowCapableNodeOptional.asSet().stream())
+ .flatMap(flowCapableNode -> flowCapableNode.getTable().stream())
+ .flatMap(table -> table.getFlow().stream())
+ .count();