GBP coexistence - OF renderer can remove only its own flows from node 03/32403/4
authorVladimir Lavor.com <vlavor@cisco.com>
Tue, 22 Dec 2015 09:48:53 +0000 (10:48 +0100)
committerMartin Sunal <msunal@cisco.com>
Thu, 14 Jan 2016 16:19:28 +0000 (16:19 +0000)
Signed-off-by: Vladimir Lavor.com <vlavor@cisco.com>
Change-Id: Ibd52a8e494bbd845b215464f834fd557e38317fd

renderers/ofoverlay/src/main/java/org/opendaylight/groupbasedpolicy/renderer/ofoverlay/OFOverlayRenderer.java
renderers/ofoverlay/src/main/java/org/opendaylight/groupbasedpolicy/renderer/ofoverlay/OfWriter.java
renderers/ofoverlay/src/main/java/org/opendaylight/groupbasedpolicy/renderer/ofoverlay/PolicyManager.java
renderers/ofoverlay/src/test/java/org/opendaylight/groupbasedpolicy/renderer/ofoverlay/MockPolicyManager.java
renderers/ofoverlay/src/test/java/org/opendaylight/groupbasedpolicy/renderer/ofoverlay/PolicyManagerTest.java

index 6329432ee10eefd82026cae67fd1d3bc683fc2ab..6ca185f4df633333ab03982db91b80c73fdb08a2 100644 (file)
@@ -102,7 +102,6 @@ public class OFOverlayRenderer implements AutoCloseable, DataChangeListener {
         policyManager = new PolicyManager(dataProvider,
                 switchManager,
                 endpointManager,
-                rpcRegistry,
                 executor,
                 tableOffset);
         ofOverlayAug = new OfOverlayAug(dataProvider, epRendererAugmentationRegistry);
index ecbde4453f59b1176d8c0568bd0e3f25c58d843f..5a8c77dd6c775ade607f0eb398c86a01042b5cba 100755 (executable)
@@ -13,6 +13,7 @@ import static org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.FlowUtil
 
 import javax.annotation.Nullable;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -182,7 +183,18 @@ public class OfWriter {
         }
     }
 
-    public void commitToDataStore(DataBroker dataBroker) {
+    /**
+     * Update groups and flows on every node
+     * Only flows created by gbp - which are present in actualFlowMap - can be removed. It ensures no other flows
+     * are deleted
+     * Newly created flows are returned and will be used as actual in next update
+     *
+     * @param actualFlowMap map of flows which are currently present on all nodes
+     * @return map of newly created flows. These flows will be "actual" in next update
+     */
+    public Map<InstanceIdentifier<Table>, TableBuilder> commitToDataStore(DataBroker dataBroker,
+                                                                          Map<InstanceIdentifier<Table>, TableBuilder> actualFlowMap) {
+        Map<InstanceIdentifier<Table>, TableBuilder> actualFlows = new HashMap<>();
         if (dataBroker != null) {
 
             for (NodeId nodeId : groupIdsByNode.keySet()) {
@@ -193,47 +205,51 @@ public class OfWriter {
                 }
             }
 
-            for (Map.Entry<InstanceIdentifier<Table>, TableBuilder> entry : flowMap.entrySet()) {
+            for (Map.Entry<InstanceIdentifier<Table>, TableBuilder> newEntry : flowMap.entrySet()) {
                 try {
-                    /*
-                     * Get the currently configured flows for
-                     * this table.
-                     */
-                    updateFlowTable(dataBroker, entry);
+                    // Get actual flows on the same node/table
+                    Map.Entry<InstanceIdentifier<Table>, TableBuilder> actualEntry = null;
+                    for (Map.Entry<InstanceIdentifier<Table>, TableBuilder> a : actualFlowMap.entrySet()) {
+                        if (a.getKey().equals(newEntry.getKey())) {
+                            actualEntry = a;
+                        }
+                    }
+                    // Get the currently configured flows for this table
+                    updateFlowTable(dataBroker, newEntry, actualEntry);
+                    actualFlows.put(newEntry.getKey(), newEntry.getValue());
                 } catch (Exception e) {
-                    LOG.warn("Couldn't read flow table {}", entry.getKey());
+                    LOG.warn("Couldn't read flow table {}", newEntry.getKey());
                 }
             }
         }
+        return actualFlows;
     }
 
-    private void updateFlowTable(DataBroker dataBroker,
-            Map.Entry<InstanceIdentifier<Table>, TableBuilder> entry)
+    private void updateFlowTable(DataBroker dataBroker, Map.Entry<InstanceIdentifier<Table>, TableBuilder> desiredFlowMap,
+                                 Map.Entry<InstanceIdentifier<Table>, TableBuilder> actualFlowMap)
             throws ExecutionException, InterruptedException {
-        // flows to update
-        Set<Flow> update = new HashSet<>(entry.getValue().getFlow());
-        // flows currently in the table
-        Set<Flow> curr = new HashSet<>();
-
-        final InstanceIdentifier<Table> tableIid = entry.getKey();
-        ReadWriteTransaction t = dataBroker.newReadWriteTransaction();
-        Optional<Table> r = t.read(LogicalDatastoreType.CONFIGURATION, tableIid).get();
 
-        if (r.isPresent()) {
-            Table currentTable = r.get();
-            curr = new HashSet<>(currentTable.getFlow());
+        // Actual state
+        List<Flow> actualFlows = new ArrayList<>();
+        if (actualFlowMap != null && actualFlowMap.getValue() != null) {
+            actualFlows = actualFlowMap.getValue().getFlow();
         }
+        // New state
+        List<Flow> desiredFlows = new ArrayList<>(desiredFlowMap.getValue().getFlow());
 
         // Sets with custom equivalence rules
-        Set<Equivalence.Wrapper<Flow>> oldFlows = new HashSet<>(
-                Collections2.transform(curr, EquivalenceFabric.FLOW_WRAPPER_FUNCTION));
-        Set<Equivalence.Wrapper<Flow>> updatedFlows = new HashSet<>(
-                Collections2.transform(update, EquivalenceFabric.FLOW_WRAPPER_FUNCTION));
-
-        // what is still there but was not updated, needs to be deleted
-        Sets.SetView<Equivalence.Wrapper<Flow>> deletions = Sets.difference(oldFlows, updatedFlows);
-        // new flows (they were not there before)
-        Sets.SetView<Equivalence.Wrapper<Flow>> additions = Sets.difference(updatedFlows, oldFlows);
+        Set<Equivalence.Wrapper<Flow>> wrappedActualFlows = new HashSet<>(
+                Collections2.transform(actualFlows, EquivalenceFabric.FLOW_WRAPPER_FUNCTION));
+        Set<Equivalence.Wrapper<Flow>> wrappedDesiredFlows = new HashSet<>(
+                Collections2.transform(desiredFlows, EquivalenceFabric.FLOW_WRAPPER_FUNCTION));
+
+        // All gbp flows which are not updated will be removed
+        Sets.SetView<Equivalence.Wrapper<Flow>> deletions = Sets.difference(wrappedActualFlows, wrappedDesiredFlows);
+        // New flows (they were not there before)
+        Sets.SetView<Equivalence.Wrapper<Flow>> additions = Sets.difference(wrappedDesiredFlows, wrappedActualFlows);
+
+        final InstanceIdentifier<Table> tableIid = desiredFlowMap.getKey();
+        ReadWriteTransaction t = dataBroker.newReadWriteTransaction();
 
         if (!deletions.isEmpty()) {
             for (Equivalence.Wrapper<Flow> wf : deletions) {
index 2e40312724734188c231fee156b3aa48f731c2d3..d0277afa8c9e7a4ee520f2fc9fa07a9d826bca9c 100755 (executable)
@@ -12,7 +12,9 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutionException;
@@ -27,7 +29,6 @@ import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
 import org.opendaylight.groupbasedpolicy.dto.EgKey;
 import org.opendaylight.groupbasedpolicy.dto.EpKey;
 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.endpoint.EndpointManager;
@@ -47,6 +48,7 @@ import org.opendaylight.groupbasedpolicy.util.DataStoreHelper;
 import org.opendaylight.groupbasedpolicy.util.IidFactory;
 import org.opendaylight.groupbasedpolicy.util.SingletonTask;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.ofoverlay.rev140528.OfOverlayConfig.LearningMode;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.renderer.rev151103.renderers.renderer.interests.followed.tenants.followed.tenant.FollowedEndpointGroup;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.renderer.rev151103.renderers.renderer.interests.followed.tenants.followed.tenant.FollowedEndpointGroupBuilder;
@@ -76,6 +78,9 @@ public class PolicyManager
     private static final Logger LOG =
             LoggerFactory.getLogger(PolicyManager.class);
 
+    private Map<InstanceIdentifier<Table>, TableBuilder> actualGbpFlows = new HashMap<>();
+    private Map<InstanceIdentifier<Table>, TableBuilder> previousGbpFlows  = new HashMap<>();
+
     private short tableOffset;
     private static final short TABLEID_PORTSECURITY = 0;
     private static final short TABLEID_INGRESS_NAT =  1;
@@ -103,7 +108,6 @@ public class PolicyManager
     public PolicyManager(DataBroker dataBroker,
                          SwitchManager switchManager,
                          EndpointManager endpointManager,
-                         RpcProviderRegistry rpcRegistry,
                          ScheduledExecutorService executor,
                          short tableOffset) {
         super();
@@ -192,7 +196,7 @@ public class PolicyManager
         for (Short tableId : tableIDs) {
             for (NodeId nodeId : switchManager.getReadySwitches()) {
                 final InstanceIdentifier<Table> tablePath = FlowUtils.createTablePath(nodeId, tableId);
-                checkList.add(deteleTableIfExists(rwTx, tablePath));
+                checkList.add(deleteTableIfExists(rwTx, tablePath));
             }
         }
         ListenableFuture<List<Void>> allAsListFuture = Futures.allAsList(checkList);
@@ -217,7 +221,7 @@ public class PolicyManager
         return tableIds;
     }
 
-    private ListenableFuture<Void> deteleTableIfExists(final ReadWriteTransaction rwTx, final InstanceIdentifier<Table> tablePath){
+    private ListenableFuture<Void> deleteTableIfExists(final ReadWriteTransaction rwTx, final InstanceIdentifier<Table> tablePath){
     return Futures.transform(rwTx.read(LogicalDatastoreType.CONFIGURATION, tablePath), new Function<Optional<Table>, Void>() {
 
         @Override
@@ -345,6 +349,11 @@ public class PolicyManager
     private void scheduleUpdate() {
         if (switchManager != null) {
             LOG.trace("Scheduling flow update task");
+
+            // Mark all existing flows as previous - will be compared with new ones
+            previousGbpFlows = actualGbpFlows;
+            actualGbpFlows = new HashMap<>();
+
             flowUpdateTask.reschedule(FLOW_UPDATE_DELAY, TimeUnit.MILLISECONDS);
         }
     }
@@ -390,21 +399,19 @@ public class PolicyManager
 
             CompletionService<Void> ecs
                 = new ExecutorCompletionService<>(executor);
-            int n = 0;
 
             OfWriter ofWriter = new OfWriter();
 
             SwitchFlowUpdateTask swut = new SwitchFlowUpdateTask(ofWriter);
             ecs.submit(swut);
-            n+=1;
-
-            for (int i = 0; i < n; i++) {
-                try {
-                    ecs.take().get();
-                    ofWriter.commitToDataStore(dataBroker);
-                } catch (InterruptedException | ExecutionException e) {
-                    LOG.error("Failed to update flow tables", e);
-                }
+
+            try {
+                ecs.take().get();
+                // Current gbp flow must be independent, find out where this run() ends,
+                // set flows to one field and reset another
+                actualGbpFlows.putAll(ofWriter.commitToDataStore(dataBroker, previousGbpFlows));
+            } catch (InterruptedException | ExecutionException e) {
+                LOG.error("Failed to update flow tables", e);
             }
             LOG.debug("Flow update completed");
         }
index 177872747cff25513f81fc7c54f1c9e9004987ec..0d414435d5e31f3fd70c1eaaca1e8f2f8fecde7e 100644 (file)
@@ -14,7 +14,7 @@ public class MockPolicyManager extends PolicyManager {
 
     private static short offSet = 0;
     public MockPolicyManager(EndpointManager endpointManager) {
-        super(null, null, endpointManager, null, null, offSet);
+        super(null, null, endpointManager, null, offSet);
     }
 
 }
index 392ad396f3c80bef4cdf3c769303de7f11d3a80f..0cae3198be2c1adccd12b4fec8ed442d688400fe 100644 (file)
@@ -17,6 +17,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.util.HashMap;
 import java.util.concurrent.ScheduledExecutorService;
 
 import org.junit.Before;
@@ -28,13 +29,13 @@ import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
 import org.opendaylight.groupbasedpolicy.dto.EgKey;
 import org.opendaylight.groupbasedpolicy.dto.EpKey;
 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.endpoint.EndpointManager;
 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.node.SwitchManager;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev100924.MacAddress;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.EndpointGroupId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.L2ContextId;
@@ -44,13 +45,12 @@ import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.ListenableFuture;
 
 public class PolicyManagerTest {
 
     // constant values used by the tested class implementation
     private static final short TABLEID_PORTSECURITY = 0;
-    private static final short TABLEID_INGRESS_NAT =  1;
+    private static final short TABLEID_INGRESS_NAT = 1;
     private static final short TABLEID_SOURCE_MAPPER = 2;
     private static final short TABLEID_DESTINATION_MAPPER = 3;
     private static final short TABLEID_POLICY_ENFORCER = 4;
@@ -61,12 +61,8 @@ public class PolicyManagerTest {
 
     private DataBroker dataBroker;
     private SwitchManager switchManager;
-    private EndpointManager endpointManager;
-    private RpcProviderRegistry rpcRegistry;
-    private ScheduledExecutorService executor;
     private short tableOffset;
 
-    private WriteTransaction writeTransaction;
     private ReadWriteTransaction readWriteTransaction;
 
     private NodeId nodeId;
@@ -75,21 +71,20 @@ public class PolicyManagerTest {
 
     @Before
     public void setUp() {
+        EndpointManager endpointManager = mock(EndpointManager.class);
+        ScheduledExecutorService executor = mock(ScheduledExecutorService.class);
         dataBroker = mock(DataBroker.class);
         switchManager = mock(SwitchManager.class);
-        endpointManager = mock(EndpointManager.class);
-        rpcRegistry = mock(RpcProviderRegistry.class);
-        executor = mock(ScheduledExecutorService.class);
         tableOffset = 5;
 
-        writeTransaction = mock(WriteTransaction.class);
+        WriteTransaction writeTransaction = mock(WriteTransaction.class);
         when(dataBroker.newWriteOnlyTransaction()).thenReturn(writeTransaction);
 
         readWriteTransaction = mock(ReadWriteTransaction.class);
         when(dataBroker.newReadWriteTransaction()).thenReturn(readWriteTransaction);
 
         manager = new PolicyManager(dataBroker, switchManager,
-                endpointManager, rpcRegistry, executor, tableOffset);
+                endpointManager, executor, tableOffset);
 
         nodeId = mock(NodeId.class);
         tableId = 5;
@@ -114,10 +109,9 @@ public class PolicyManagerTest {
         CheckedFuture<Void, TransactionCommitFailedException> submitFuture = mock(CheckedFuture.class);
         when(readWriteTransaction.submit()).thenReturn(submitFuture);
 
-        flowMap.commitToDataStore(dataBroker);
+        flowMap.commitToDataStore(dataBroker, new HashMap<InstanceIdentifier<Table>, TableBuilder>());
 
         InOrder orderCheck = inOrder(readWriteTransaction);
-        orderCheck.verify(readWriteTransaction).read(any(LogicalDatastoreType.class), any(InstanceIdentifier.class));
         orderCheck.verify(readWriteTransaction).put(any(LogicalDatastoreType.class), any(InstanceIdentifier.class),
                 any(Flow.class), any(Boolean.class));
         orderCheck.verify(readWriteTransaction).submit();
@@ -126,7 +120,7 @@ public class PolicyManagerTest {
     @Test
     public void changeOpenFlowTableOffsetTest() throws Exception {
         short tableOffset = 3;
-        assertTrue(manager.changeOpenFlowTableOffset(tableOffset) instanceof ListenableFuture<?>);
+        assertTrue(manager.changeOpenFlowTableOffset(tableOffset) != null);
         verify(switchManager, times(7)).getReadySwitches();
     }