From 931430eff0440f2657822770ae72f84b51967eff Mon Sep 17 00:00:00 2001 From: "Vladimir Lavor.com" Date: Tue, 22 Dec 2015 10:48:53 +0100 Subject: [PATCH 1/1] GBP coexistence - OF renderer can remove only its own flows from node Signed-off-by: Vladimir Lavor.com Change-Id: Ibd52a8e494bbd845b215464f834fd557e38317fd --- .../renderer/ofoverlay/OFOverlayRenderer.java | 1 - .../renderer/ofoverlay/OfWriter.java | 76 +++++++++++-------- .../renderer/ofoverlay/PolicyManager.java | 35 +++++---- .../renderer/ofoverlay/MockPolicyManager.java | 2 +- .../renderer/ofoverlay/PolicyManagerTest.java | 24 +++--- 5 files changed, 77 insertions(+), 61 deletions(-) diff --git a/renderers/ofoverlay/src/main/java/org/opendaylight/groupbasedpolicy/renderer/ofoverlay/OFOverlayRenderer.java b/renderers/ofoverlay/src/main/java/org/opendaylight/groupbasedpolicy/renderer/ofoverlay/OFOverlayRenderer.java index 6329432ee..6ca185f4d 100644 --- a/renderers/ofoverlay/src/main/java/org/opendaylight/groupbasedpolicy/renderer/ofoverlay/OFOverlayRenderer.java +++ b/renderers/ofoverlay/src/main/java/org/opendaylight/groupbasedpolicy/renderer/ofoverlay/OFOverlayRenderer.java @@ -102,7 +102,6 @@ public class OFOverlayRenderer implements AutoCloseable, DataChangeListener { policyManager = new PolicyManager(dataProvider, switchManager, endpointManager, - rpcRegistry, executor, tableOffset); ofOverlayAug = new OfOverlayAug(dataProvider, epRendererAugmentationRegistry); diff --git a/renderers/ofoverlay/src/main/java/org/opendaylight/groupbasedpolicy/renderer/ofoverlay/OfWriter.java b/renderers/ofoverlay/src/main/java/org/opendaylight/groupbasedpolicy/renderer/ofoverlay/OfWriter.java index ecbde4453..5a8c77dd6 100755 --- a/renderers/ofoverlay/src/main/java/org/opendaylight/groupbasedpolicy/renderer/ofoverlay/OfWriter.java +++ b/renderers/ofoverlay/src/main/java/org/opendaylight/groupbasedpolicy/renderer/ofoverlay/OfWriter.java @@ -13,6 +13,7 @@ import static org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.FlowUtil import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -182,7 +183,18 @@ public class OfWriter { } } - public void commitToDataStore(DataBroker dataBroker) { + /** + * Update groups and flows on every node + * Only flows created by gbp - which are present in actualFlowMap - can be removed. It ensures no other flows + * are deleted + * Newly created flows are returned and will be used as actual in next update + * + * @param actualFlowMap map of flows which are currently present on all nodes + * @return map of newly created flows. These flows will be "actual" in next update + */ + public Map, TableBuilder> commitToDataStore(DataBroker dataBroker, + Map, TableBuilder> actualFlowMap) { + Map, TableBuilder> actualFlows = new HashMap<>(); if (dataBroker != null) { for (NodeId nodeId : groupIdsByNode.keySet()) { @@ -193,47 +205,51 @@ public class OfWriter { } } - for (Map.Entry, TableBuilder> entry : flowMap.entrySet()) { + for (Map.Entry, TableBuilder> newEntry : flowMap.entrySet()) { try { - /* - * Get the currently configured flows for - * this table. - */ - updateFlowTable(dataBroker, entry); + // Get actual flows on the same node/table + Map.Entry, TableBuilder> actualEntry = null; + for (Map.Entry, TableBuilder> a : actualFlowMap.entrySet()) { + if (a.getKey().equals(newEntry.getKey())) { + actualEntry = a; + } + } + // Get the currently configured flows for this table + updateFlowTable(dataBroker, newEntry, actualEntry); + actualFlows.put(newEntry.getKey(), newEntry.getValue()); } catch (Exception e) { - LOG.warn("Couldn't read flow table {}", entry.getKey()); + LOG.warn("Couldn't read flow table {}", newEntry.getKey()); } } } + return actualFlows; } - private void updateFlowTable(DataBroker dataBroker, - Map.Entry, TableBuilder> entry) + private void updateFlowTable(DataBroker dataBroker, Map.Entry, TableBuilder> desiredFlowMap, + Map.Entry, TableBuilder> actualFlowMap) throws ExecutionException, InterruptedException { - // flows to update - Set update = new HashSet<>(entry.getValue().getFlow()); - // flows currently in the table - Set curr = new HashSet<>(); - - final InstanceIdentifier tableIid = entry.getKey(); - ReadWriteTransaction t = dataBroker.newReadWriteTransaction(); - Optional
r = t.read(LogicalDatastoreType.CONFIGURATION, tableIid).get(); - if (r.isPresent()) { - Table currentTable = r.get(); - curr = new HashSet<>(currentTable.getFlow()); + // Actual state + List actualFlows = new ArrayList<>(); + if (actualFlowMap != null && actualFlowMap.getValue() != null) { + actualFlows = actualFlowMap.getValue().getFlow(); } + // New state + List desiredFlows = new ArrayList<>(desiredFlowMap.getValue().getFlow()); // Sets with custom equivalence rules - Set> oldFlows = new HashSet<>( - Collections2.transform(curr, EquivalenceFabric.FLOW_WRAPPER_FUNCTION)); - Set> updatedFlows = new HashSet<>( - Collections2.transform(update, EquivalenceFabric.FLOW_WRAPPER_FUNCTION)); - - // what is still there but was not updated, needs to be deleted - Sets.SetView> deletions = Sets.difference(oldFlows, updatedFlows); - // new flows (they were not there before) - Sets.SetView> additions = Sets.difference(updatedFlows, oldFlows); + Set> wrappedActualFlows = new HashSet<>( + Collections2.transform(actualFlows, EquivalenceFabric.FLOW_WRAPPER_FUNCTION)); + Set> wrappedDesiredFlows = new HashSet<>( + Collections2.transform(desiredFlows, EquivalenceFabric.FLOW_WRAPPER_FUNCTION)); + + // All gbp flows which are not updated will be removed + Sets.SetView> deletions = Sets.difference(wrappedActualFlows, wrappedDesiredFlows); + // New flows (they were not there before) + Sets.SetView> additions = Sets.difference(wrappedDesiredFlows, wrappedActualFlows); + + final InstanceIdentifier
tableIid = desiredFlowMap.getKey(); + ReadWriteTransaction t = dataBroker.newReadWriteTransaction(); if (!deletions.isEmpty()) { for (Equivalence.Wrapper wf : deletions) { diff --git a/renderers/ofoverlay/src/main/java/org/opendaylight/groupbasedpolicy/renderer/ofoverlay/PolicyManager.java b/renderers/ofoverlay/src/main/java/org/opendaylight/groupbasedpolicy/renderer/ofoverlay/PolicyManager.java index 2e4031272..d0277afa8 100755 --- a/renderers/ofoverlay/src/main/java/org/opendaylight/groupbasedpolicy/renderer/ofoverlay/PolicyManager.java +++ b/renderers/ofoverlay/src/main/java/org/opendaylight/groupbasedpolicy/renderer/ofoverlay/PolicyManager.java @@ -12,7 +12,9 @@ import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; @@ -27,7 +29,6 @@ import org.opendaylight.controller.md.sal.binding.api.DataTreeModification; import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction; import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; -import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry; import org.opendaylight.groupbasedpolicy.dto.EgKey; import org.opendaylight.groupbasedpolicy.dto.EpKey; import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.endpoint.EndpointManager; @@ -47,6 +48,7 @@ import org.opendaylight.groupbasedpolicy.util.DataStoreHelper; import org.opendaylight.groupbasedpolicy.util.IidFactory; import org.opendaylight.groupbasedpolicy.util.SingletonTask; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.ofoverlay.rev140528.OfOverlayConfig.LearningMode; import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.renderer.rev151103.renderers.renderer.interests.followed.tenants.followed.tenant.FollowedEndpointGroup; import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.renderer.rev151103.renderers.renderer.interests.followed.tenants.followed.tenant.FollowedEndpointGroupBuilder; @@ -76,6 +78,9 @@ public class PolicyManager private static final Logger LOG = LoggerFactory.getLogger(PolicyManager.class); + private Map, TableBuilder> actualGbpFlows = new HashMap<>(); + private Map, TableBuilder> previousGbpFlows = new HashMap<>(); + private short tableOffset; private static final short TABLEID_PORTSECURITY = 0; private static final short TABLEID_INGRESS_NAT = 1; @@ -103,7 +108,6 @@ public class PolicyManager public PolicyManager(DataBroker dataBroker, SwitchManager switchManager, EndpointManager endpointManager, - RpcProviderRegistry rpcRegistry, ScheduledExecutorService executor, short tableOffset) { super(); @@ -192,7 +196,7 @@ public class PolicyManager for (Short tableId : tableIDs) { for (NodeId nodeId : switchManager.getReadySwitches()) { final InstanceIdentifier
tablePath = FlowUtils.createTablePath(nodeId, tableId); - checkList.add(deteleTableIfExists(rwTx, tablePath)); + checkList.add(deleteTableIfExists(rwTx, tablePath)); } } ListenableFuture> allAsListFuture = Futures.allAsList(checkList); @@ -217,7 +221,7 @@ public class PolicyManager return tableIds; } - private ListenableFuture deteleTableIfExists(final ReadWriteTransaction rwTx, final InstanceIdentifier
tablePath){ + private ListenableFuture deleteTableIfExists(final ReadWriteTransaction rwTx, final InstanceIdentifier
tablePath){ return Futures.transform(rwTx.read(LogicalDatastoreType.CONFIGURATION, tablePath), new Function, Void>() { @Override @@ -345,6 +349,11 @@ public class PolicyManager private void scheduleUpdate() { if (switchManager != null) { LOG.trace("Scheduling flow update task"); + + // Mark all existing flows as previous - will be compared with new ones + previousGbpFlows = actualGbpFlows; + actualGbpFlows = new HashMap<>(); + flowUpdateTask.reschedule(FLOW_UPDATE_DELAY, TimeUnit.MILLISECONDS); } } @@ -390,21 +399,19 @@ public class PolicyManager CompletionService ecs = new ExecutorCompletionService<>(executor); - int n = 0; OfWriter ofWriter = new OfWriter(); SwitchFlowUpdateTask swut = new SwitchFlowUpdateTask(ofWriter); ecs.submit(swut); - n+=1; - - for (int i = 0; i < n; i++) { - try { - ecs.take().get(); - ofWriter.commitToDataStore(dataBroker); - } catch (InterruptedException | ExecutionException e) { - LOG.error("Failed to update flow tables", e); - } + + try { + ecs.take().get(); + // Current gbp flow must be independent, find out where this run() ends, + // set flows to one field and reset another + actualGbpFlows.putAll(ofWriter.commitToDataStore(dataBroker, previousGbpFlows)); + } catch (InterruptedException | ExecutionException e) { + LOG.error("Failed to update flow tables", e); } LOG.debug("Flow update completed"); } diff --git a/renderers/ofoverlay/src/test/java/org/opendaylight/groupbasedpolicy/renderer/ofoverlay/MockPolicyManager.java b/renderers/ofoverlay/src/test/java/org/opendaylight/groupbasedpolicy/renderer/ofoverlay/MockPolicyManager.java index 177872747..0d414435d 100644 --- a/renderers/ofoverlay/src/test/java/org/opendaylight/groupbasedpolicy/renderer/ofoverlay/MockPolicyManager.java +++ b/renderers/ofoverlay/src/test/java/org/opendaylight/groupbasedpolicy/renderer/ofoverlay/MockPolicyManager.java @@ -14,7 +14,7 @@ public class MockPolicyManager extends PolicyManager { private static short offSet = 0; public MockPolicyManager(EndpointManager endpointManager) { - super(null, null, endpointManager, null, null, offSet); + super(null, null, endpointManager, null, offSet); } } diff --git a/renderers/ofoverlay/src/test/java/org/opendaylight/groupbasedpolicy/renderer/ofoverlay/PolicyManagerTest.java b/renderers/ofoverlay/src/test/java/org/opendaylight/groupbasedpolicy/renderer/ofoverlay/PolicyManagerTest.java index 392ad396f..0cae3198b 100644 --- a/renderers/ofoverlay/src/test/java/org/opendaylight/groupbasedpolicy/renderer/ofoverlay/PolicyManagerTest.java +++ b/renderers/ofoverlay/src/test/java/org/opendaylight/groupbasedpolicy/renderer/ofoverlay/PolicyManagerTest.java @@ -17,6 +17,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.util.HashMap; import java.util.concurrent.ScheduledExecutorService; import org.junit.Before; @@ -28,13 +29,13 @@ import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; -import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry; import org.opendaylight.groupbasedpolicy.dto.EgKey; import org.opendaylight.groupbasedpolicy.dto.EpKey; import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.endpoint.EndpointManager; import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.node.SwitchManager; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev100924.MacAddress; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow; import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.EndpointGroupId; import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.L2ContextId; @@ -44,13 +45,12 @@ import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import com.google.common.base.Optional; import com.google.common.util.concurrent.CheckedFuture; -import com.google.common.util.concurrent.ListenableFuture; public class PolicyManagerTest { // constant values used by the tested class implementation private static final short TABLEID_PORTSECURITY = 0; - private static final short TABLEID_INGRESS_NAT = 1; + private static final short TABLEID_INGRESS_NAT = 1; private static final short TABLEID_SOURCE_MAPPER = 2; private static final short TABLEID_DESTINATION_MAPPER = 3; private static final short TABLEID_POLICY_ENFORCER = 4; @@ -61,12 +61,8 @@ public class PolicyManagerTest { private DataBroker dataBroker; private SwitchManager switchManager; - private EndpointManager endpointManager; - private RpcProviderRegistry rpcRegistry; - private ScheduledExecutorService executor; private short tableOffset; - private WriteTransaction writeTransaction; private ReadWriteTransaction readWriteTransaction; private NodeId nodeId; @@ -75,21 +71,20 @@ public class PolicyManagerTest { @Before public void setUp() { + EndpointManager endpointManager = mock(EndpointManager.class); + ScheduledExecutorService executor = mock(ScheduledExecutorService.class); dataBroker = mock(DataBroker.class); switchManager = mock(SwitchManager.class); - endpointManager = mock(EndpointManager.class); - rpcRegistry = mock(RpcProviderRegistry.class); - executor = mock(ScheduledExecutorService.class); tableOffset = 5; - writeTransaction = mock(WriteTransaction.class); + WriteTransaction writeTransaction = mock(WriteTransaction.class); when(dataBroker.newWriteOnlyTransaction()).thenReturn(writeTransaction); readWriteTransaction = mock(ReadWriteTransaction.class); when(dataBroker.newReadWriteTransaction()).thenReturn(readWriteTransaction); manager = new PolicyManager(dataBroker, switchManager, - endpointManager, rpcRegistry, executor, tableOffset); + endpointManager, executor, tableOffset); nodeId = mock(NodeId.class); tableId = 5; @@ -114,10 +109,9 @@ public class PolicyManagerTest { CheckedFuture submitFuture = mock(CheckedFuture.class); when(readWriteTransaction.submit()).thenReturn(submitFuture); - flowMap.commitToDataStore(dataBroker); + flowMap.commitToDataStore(dataBroker, new HashMap, TableBuilder>()); InOrder orderCheck = inOrder(readWriteTransaction); - orderCheck.verify(readWriteTransaction).read(any(LogicalDatastoreType.class), any(InstanceIdentifier.class)); orderCheck.verify(readWriteTransaction).put(any(LogicalDatastoreType.class), any(InstanceIdentifier.class), any(Flow.class), any(Boolean.class)); orderCheck.verify(readWriteTransaction).submit(); @@ -126,7 +120,7 @@ public class PolicyManagerTest { @Test public void changeOpenFlowTableOffsetTest() throws Exception { short tableOffset = 3; - assertTrue(manager.changeOpenFlowTableOffset(tableOffset) instanceof ListenableFuture); + assertTrue(manager.changeOpenFlowTableOffset(tableOffset) != null); verify(switchManager, times(7)).getReadySwitches(); } -- 2.36.6