Fix DeviceFlowRegistry filling 34/41834/1
authorTomas Slusny <tomas.slusny@pantheon.sk>
Thu, 14 Jul 2016 14:01:54 +0000 (16:01 +0200)
committerTomas Slusny <tomas.slusny@pantheon.sk>
Thu, 14 Jul 2016 14:01:54 +0000 (16:01 +0200)
- Fixed double filling of DeviceFlowRegistry
- Moved nodeInstanceIdentifier parameter from DeviceFlowRegistry.fill
  to it's constructor

Change-Id: I11be668a80b98f40e0b9aaa863d616fe669c0921
Signed-off-by: Tomas Slusny <tomas.slusny@pantheon.sk>
openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/registry/flow/DeviceFlowRegistry.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/LifecycleConductorImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/registry/flow/DeviceFlowRegistryImpl.java
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/LifecycleConductorImplTest.java
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/registry/flow/DeviceFlowRegistryImplTest.java
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/services/ServiceMocking.java
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsManagerImplTest.java

index 4b8314a2120d7956eb49259dfe4e928b3dbe1293..1e574f64592dd25dc7b439bca7a08777bf40548f 100644 (file)
@@ -24,7 +24,7 @@ import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
  */
 public interface DeviceFlowRegistry extends AutoCloseable {
 
-    ListenableFuture<List<Optional<FlowCapableNode>>> fill(KeyedInstanceIdentifier<Node, NodeKey> instanceIdentifier);
+    ListenableFuture<List<Optional<FlowCapableNode>>> fill();
 
     FlowDescriptor retrieveIdForFlow(FlowRegistryKey flowRegistryKey);
 
index c0ea15a01bcf8e2a30e88d09b984d1f8ca37681f..af2efaf21f8ea67b4589fd2b959614ea8cf6d8ba 100644 (file)
@@ -34,6 +34,7 @@ import org.opendaylight.openflowplugin.api.openflow.lifecycle.DeviceContextChang
 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleConductor;
 import org.opendaylight.openflowplugin.api.openflow.lifecycle.RoleChangeListener;
 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ServiceChangeListener;
+import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext;
 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager;
 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
@@ -164,44 +165,7 @@ final class LifecycleConductorImpl implements LifecycleConductor, RoleChangeList
 
         if (OfpRole.BECOMEMASTER.equals(newRole)) {
             logText = "Start";
-
-            // Fill device flow registry with flows from datastore
-            final ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill =
-                    deviceContext.getDeviceFlowRegistry().fill(deviceInfo.getNodeInstanceIdentifier());
-
-            // Start statistics scheduling only after we finished initializing device flow registry
-            Futures.addCallback(deviceFlowRegistryFill, new FutureCallback<List<Optional<FlowCapableNode>>>() {
-                @Override
-                public void onSuccess(@Nullable List<Optional<FlowCapableNode>> result) {
-                    if (LOG.isDebugEnabled()) {
-                        // Count all flows we read from datastore for debugging purposes.
-                        // This number do not always represent how many flows were actually added
-                        // to DeviceFlowRegistry, because of possible duplicates.
-                        long flowCount = Optional.fromNullable(result).asSet().stream()
-                                .flatMap(Collection::stream)
-                                .flatMap(flowCapableNodeOptional -> flowCapableNodeOptional.asSet().stream())
-                                .flatMap(flowCapableNode -> flowCapableNode.getTable().stream())
-                                .flatMap(table -> table.getFlow().stream())
-                                .count();
-
-                        LOG.debug("Finished filling flow registry with {} flows for node: {}", flowCount, deviceInfo.getNodeId());
-                    }
-
-                    statisticsManager.startScheduling(deviceInfo);
-                }
-
-                @Override
-                public void onFailure(Throwable t) {
-                    // If we manually cancelled this future, do not start scheduling of statistics
-                    if (deviceFlowRegistryFill.isCancelled()) {
-                        LOG.debug("Cancelled filling flow registry with flows for node: {}", deviceInfo.getNodeId());
-                    } else {
-                        LOG.warn("Failed filling flow registry with flows for node: {} with exception: {}", deviceInfo.getNodeId(), t);
-                        statisticsManager.startScheduling(deviceInfo);
-                    }
-                }
-            });
-
+            fillDeviceFlowRegistry(deviceInfo, deviceContext.getDeviceFlowRegistry());
             MdSalRegistrationUtils.registerServices(rpcContext, deviceContext, this.extensionConverterProvider);
 
             if (rpcContext.isStatisticsRpcEnabled()) {
@@ -210,9 +174,6 @@ final class LifecycleConductorImpl implements LifecycleConductor, RoleChangeList
                         deviceContext,
                         notificationPublishService);
             }
-
-            // Fill flow registry with flows found in operational and config datastore
-            deviceContext.getDeviceFlowRegistry().fill(deviceInfo.getNodeInstanceIdentifier());
         } else {
             logText = "Stopp";
             statisticsManager.stopScheduling(deviceInfo);
@@ -243,6 +204,44 @@ final class LifecycleConductorImpl implements LifecycleConductor, RoleChangeList
         });
     }
 
+    private void fillDeviceFlowRegistry(final DeviceInfo deviceInfo, final DeviceFlowRegistry deviceFlowRegistry) {
+        // Fill device flow registry with flows from datastore
+        final ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill = deviceFlowRegistry.fill();
+
+        // Start statistics scheduling only after we finished initializing device flow registry
+        Futures.addCallback(deviceFlowRegistryFill, new FutureCallback<List<Optional<FlowCapableNode>>>() {
+            @Override
+            public void onSuccess(@Nullable List<Optional<FlowCapableNode>> result) {
+                if (LOG.isDebugEnabled()) {
+                    // Count all flows we read from datastore for debugging purposes.
+                    // This number do not always represent how many flows were actually added
+                    // to DeviceFlowRegistry, because of possible duplicates.
+                    long flowCount = Optional.fromNullable(result).asSet().stream()
+                            .flatMap(Collection::stream)
+                            .flatMap(flowCapableNodeOptional -> flowCapableNodeOptional.asSet().stream())
+                            .flatMap(flowCapableNode -> flowCapableNode.getTable().stream())
+                            .flatMap(table -> table.getFlow().stream())
+                            .count();
+
+                    LOG.debug("Finished filling flow registry with {} flows for node: {}", flowCount, deviceInfo.getNodeId());
+                }
+
+                statisticsManager.startScheduling(deviceInfo);
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                // If we manually cancelled this future, do not start scheduling of statistics
+                if (deviceFlowRegistryFill.isCancelled()) {
+                    LOG.debug("Cancelled filling flow registry with flows for node: {}", deviceInfo.getNodeId());
+                } else {
+                    LOG.warn("Failed filling flow registry with flows for node: {} with exception: {}", deviceInfo.getNodeId(), t);
+                    statisticsManager.startScheduling(deviceInfo);
+                }
+            }
+        });
+    }
+
     public MessageIntelligenceAgency getMessageIntelligenceAgency() {
         return messageIntelligenceAgency;
     }
index d15216cd0eaf28cb1398286d9fded4ff536db8e4..e5f360cab87d5b113af655d1ce953d4df0217157 100644 (file)
@@ -149,7 +149,7 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
         deviceInfo = primaryConnectionContext.getDeviceInfo();
         this.transactionChainManager = new TransactionChainManager(dataBroker, deviceInfo, conductor);
         auxiliaryConnectionContexts = new HashMap<>();
-        deviceFlowRegistry = new DeviceFlowRegistryImpl(dataBroker);
+        deviceFlowRegistry = new DeviceFlowRegistryImpl(dataBroker, deviceInfo.getNodeInstanceIdentifier());
         deviceGroupRegistry = new DeviceGroupRegistryImpl();
         deviceMeterRegistry = new DeviceMeterRegistryImpl();
         messageSpy = conductor.getMessageIntelligenceAgency();
index e744812c58df15337231b619959d620c91b01307..0f262fde40c3b50b12323bcad9b7203fef15c322 100644 (file)
@@ -52,6 +52,7 @@ public class DeviceFlowRegistryImpl implements DeviceFlowRegistry {
     @GuardedBy("marks")
     private final Collection<FlowRegistryKey> marks = new HashSet<>();
     private final DataBroker dataBroker;
+    private final KeyedInstanceIdentifier<Node, NodeKey> instanceIdentifier;
     private final List<ListenableFuture<List<Optional<FlowCapableNode>>>> lastFillFutures = new ArrayList<>();
 
     // Specifies what to do with flow read from datastore
@@ -68,12 +69,13 @@ public class DeviceFlowRegistryImpl implements DeviceFlowRegistry {
     };
 
 
-    public DeviceFlowRegistryImpl(final DataBroker dataBroker) {
+    public DeviceFlowRegistryImpl(final DataBroker dataBroker, final KeyedInstanceIdentifier<Node, NodeKey> instanceIdentifier) {
         this.dataBroker = dataBroker;
+        this.instanceIdentifier = instanceIdentifier;
     }
 
     @Override
-    public ListenableFuture<List<Optional<FlowCapableNode>>> fill(final KeyedInstanceIdentifier<Node, NodeKey> instanceIdentifier) {
+    public ListenableFuture<List<Optional<FlowCapableNode>>> fill() {
         LOG.debug("Filling flow registry with flows for node: {}", instanceIdentifier);
 
         // Prepare path for read transaction
index 4d0e6dceb3a4053045e0a8af3862e0e2a88254cd..c1ba409b27d33a43966bf16ae1a67d2eb21912bc 100644 (file)
@@ -87,10 +87,11 @@ public class LifecycleConductorImplTest {
 
     private NodeId nodeId = new NodeId("openflow-junit:1");
     private OfpRole ofpRole = OfpRole.NOCHANGE;
+    private KeyedInstanceIdentifier<Node, NodeKey> nodeInstanceIdentifier = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(nodeId));
 
     @Before
     public void setUp() {
-        final KeyedInstanceIdentifier<Node, NodeKey> nodeInstanceIdentifier = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(nodeId));
+        nodeInstanceIdentifier = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(nodeId));
 
         lifecycleConductor = new LifecycleConductorImpl(messageIntelligenceAgency);
         lifecycleConductor.setSafelyManager(deviceManager);
@@ -177,7 +178,7 @@ public class LifecycleConductorImplTest {
         final DataBroker dataBroker = mock(DataBroker.class);
 
         when(deviceContext.getDeviceState()).thenReturn(deviceState);
-        when(deviceContext.getDeviceFlowRegistry()).thenReturn(new DeviceFlowRegistryImpl(dataBroker));
+        when(deviceContext.getDeviceFlowRegistry()).thenReturn(new DeviceFlowRegistryImpl(dataBroker, nodeInstanceIdentifier));
         when(deviceManager.gainContext(deviceInfo)).thenReturn(deviceContext);
         when(deviceManager.onClusterRoleChange(deviceInfo, OfpRole.BECOMEMASTER)).thenReturn(listenableFuture);
         lifecycleConductor.roleChangeOnDevice(deviceInfo,OfpRole.BECOMEMASTER);
@@ -192,7 +193,7 @@ public class LifecycleConductorImplTest {
         final DataBroker dataBroker = mock(DataBroker.class);
 
         when(deviceContext.getDeviceState()).thenReturn(deviceState);
-        when(deviceContext.getDeviceFlowRegistry()).thenReturn(new DeviceFlowRegistryImpl(dataBroker));
+        when(deviceContext.getDeviceFlowRegistry()).thenReturn(new DeviceFlowRegistryImpl(dataBroker, nodeInstanceIdentifier));
         when(deviceManager.gainContext(deviceInfo)).thenReturn(deviceContext);
         when(deviceManager.onClusterRoleChange(deviceInfo, OfpRole.BECOMESLAVE)).thenReturn(listenableFuture);
 
index e80ce3e55e2ad9ad3798c46013cf3c35c6715a36..6ee4a3c4ce43d9f82b658fafddaee60ac9f77f29 100644 (file)
@@ -45,6 +45,7 @@ public class DeviceFlowRegistryImplTest {
     private DeviceFlowRegistryImpl deviceFlowRegistry;
     private FlowRegistryKey key;
     private FlowDescriptor descriptor;
+    private KeyedInstanceIdentifier<Node, NodeKey> nodeInstanceIdentifier;
     @Mock
     private DataBroker dataBroker;
     @Mock
@@ -52,9 +53,10 @@ public class DeviceFlowRegistryImplTest {
 
     @Before
     public void setUp() throws Exception {
+        nodeInstanceIdentifier = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(new NodeId(NODE_ID)));
         when(dataBroker.newReadOnlyTransaction()).thenReturn(readOnlyTransaction);
         when(readOnlyTransaction.read(any(), any())).thenReturn(Futures.immediateCheckedFuture(Optional.absent()));
-        deviceFlowRegistry = new DeviceFlowRegistryImpl(dataBroker);
+        deviceFlowRegistry = new DeviceFlowRegistryImpl(dataBroker, nodeInstanceIdentifier);
         final FlowAndStatisticsMapList flowStats = TestFlowHelper.createFlowAndStatisticsMapListBuilder(1).build();
         key = FlowRegistryKeyFactory.create(flowStats);
         descriptor = FlowDescriptorFactory.create(key.getTableId(), new FlowId("ut:1"));
@@ -66,10 +68,9 @@ public class DeviceFlowRegistryImplTest {
 
     @Test
     public void testFill() throws Exception {
-        final KeyedInstanceIdentifier<Node, NodeKey> nodeInstanceIdentifier = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(new NodeId(NODE_ID)));
         final InstanceIdentifier<FlowCapableNode> path = nodeInstanceIdentifier.augmentation(FlowCapableNode.class);
 
-        deviceFlowRegistry.fill(nodeInstanceIdentifier).get();
+        deviceFlowRegistry.fill().get();
 
         verify(dataBroker, times(2)).newReadOnlyTransaction();
         verify(readOnlyTransaction).read(LogicalDatastoreType.CONFIGURATION, path);
index c3e68af0f087a1f519fe598b7c7d35688b409ed6..85124809af545b8f3285809aaf562be266a3d852 100644 (file)
@@ -98,7 +98,7 @@ public abstract class ServiceMocking {
 
         when(mockedDeviceContext.getPrimaryConnectionContext()).thenReturn(mockedPrimConnectionContext);
         when(mockedDeviceContext.getMessageSpy()).thenReturn(mockedMessagSpy);
-        when(mockedDeviceContext.getDeviceFlowRegistry()).thenReturn(new DeviceFlowRegistryImpl(dataBroker));
+        when(mockedDeviceContext.getDeviceFlowRegistry()).thenReturn(new DeviceFlowRegistryImpl(dataBroker, NODE_II));
         when(mockedDeviceContext.getDeviceState()).thenReturn(mockedDeviceState);
         when(mockedDeviceContext.getDeviceInfo()).thenReturn(mockedDeviceInfo);
         when(mockedDeviceContext.getMultiMsgCollector(Matchers.any())).thenReturn(multiMessageCollector);
index 44c3f3a864e7fd10129ccef928f6a9fa2339dd7c..2da9dc99ad86b3f70a67085b238036a99db3f7f5 100644 (file)
@@ -166,7 +166,7 @@ public class StatisticsManagerImplTest {
         when(mockedDeviceContext.getDeviceInfo()).thenReturn(mockedDeviceInfo);
         when(mockedDeviceContext.getPrimaryConnectionContext()).thenReturn(mockedPrimConnectionContext);
         when(mockedDeviceContext.getMessageSpy()).thenReturn(mockedMessagSpy);
-        when(mockedDeviceContext.getDeviceFlowRegistry()).thenReturn(new DeviceFlowRegistryImpl(dataBroker));
+        when(mockedDeviceContext.getDeviceFlowRegistry()).thenReturn(new DeviceFlowRegistryImpl(dataBroker, nodePath));
         when(mockedDeviceContext.getDeviceState()).thenReturn(mockedDeviceState);
         when(mockedDeviceContext.getMultiMsgCollector(
                 Matchers.<RequestContext<List<MultipartReply>>>any())).thenAnswer(