Bug 3281 duplicate flow programming 31/19631/9
authorKonstantin Blagov <kblagov@cisco.com>
Tue, 5 May 2015 14:30:13 +0000 (16:30 +0200)
committerKonstantin Blagov <kblagov@cisco.com>
Thu, 21 May 2015 14:32:30 +0000 (16:32 +0200)
implemented Guava's Equivalence for Flow

Change-Id: I35dab16646d4da638bb8ebfe93cedea6659c1874
Signed-off-by: Konstantin Blagov <kblagov@cisco.com>
renderers/ofoverlay/src/main/java/org/opendaylight/groupbasedpolicy/renderer/ofoverlay/PolicyManager.java [changed mode: 0644->0755]
renderers/ofoverlay/src/main/java/org/opendaylight/groupbasedpolicy/renderer/ofoverlay/equivalence/EquivalenceFabric.java [new file with mode: 0755]
renderers/ofoverlay/src/main/java/org/opendaylight/groupbasedpolicy/renderer/ofoverlay/equivalence/FlowEquivalence.java [new file with mode: 0755]
renderers/ofoverlay/src/main/java/org/opendaylight/groupbasedpolicy/renderer/ofoverlay/equivalence/MatchEquivalence.java [new file with mode: 0755]

old mode 100644 (file)
new mode 100755 (executable)
index 284028d..376cf27
@@ -8,20 +8,15 @@
 
 package org.opendaylight.groupbasedpolicy.renderer.ofoverlay;
 
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CompletionService;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
+import com.google.common.base.Equivalence;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
@@ -29,6 +24,7 @@ import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
 import org.opendaylight.groupbasedpolicy.endpoint.EpKey;
+import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.equivalence.EquivalenceFabric;
 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.DestinationMapper;
 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.FlowUtils;
 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.GroupTable;
@@ -37,8 +33,8 @@ import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.PolicyEnforcer;
 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.PortSecurity;
 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.SourceMapper;
 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.node.SwitchListener;
-import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.sf.Action;
 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.node.SwitchManager;
+import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.sf.Action;
 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.sf.SubjectFeatures;
 import org.opendaylight.groupbasedpolicy.resolver.EgKey;
 import org.opendaylight.groupbasedpolicy.resolver.PolicyInfo;
@@ -57,13 +53,19 @@ import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Manage policies on switches by subscribing to updates from the
@@ -95,8 +97,6 @@ public class PolicyManager
      */
     private final static int FLOW_UPDATE_DELAY = 250;
 
-
-
     public PolicyManager(DataBroker dataBroker,
                          PolicyResolver policyResolver,
                          SwitchManager switchManager,
@@ -226,9 +226,16 @@ public class PolicyManager
             return this.flowMap.get(tableIid);
         }
 
-        public void writeFlow(NodeId nodeId,short tableId, Flow flow) {
+        public void writeFlow(NodeId nodeId, short tableId, Flow flow) {
             TableBuilder tableBuilder = this.getTableForNode(nodeId, tableId);
-            if (!tableBuilder.getFlow().contains(flow)) {
+            // transforming List<Flow> to Set (with customized equals/hashCode) to eliminate duplicate entries
+            List<Flow> flows = tableBuilder.getFlow();
+            Set<Equivalence.Wrapper<Flow>> wrappedFlows =
+                    new HashSet<>(Collections2.transform(flows, EquivalenceFabric.FLOW_WRAPPER_FUNCTION));
+
+            Equivalence.Wrapper<Flow> wFlow = EquivalenceFabric.FLOW_EQUIVALENCE.wrap(flow);
+
+            if (!wrappedFlows.contains(wFlow)) {
                 tableBuilder.getFlow().add(Preconditions.checkNotNull(flow));
             }
         }
@@ -251,29 +258,49 @@ public class PolicyManager
 
         private void updateFlowTable(Entry<InstanceIdentifier<Table>,
                                      TableBuilder> entry)  throws Exception {
-            Set<Flow> update = new HashSet<Flow>(entry.getValue().getFlow());
-            Set<Flow> curr = new HashSet<Flow>();
+            // flows to update
+            Set<Flow> update = new HashSet<>(entry.getValue().getFlow());
+            // flows currently in the table
+            Set<Flow> curr = new HashSet<>();
 
             ReadWriteTransaction t = dataBroker.newReadWriteTransaction();
             Optional<Table> r =
                    t.read(LogicalDatastoreType.CONFIGURATION, entry.getKey()).get();
 
             if (r.isPresent()) {
-                Table curTable = r.get();
-                curr = new HashSet<Flow>(curTable.getFlow());
+                Table currentTable = r.get();
+                curr = new HashSet<>(currentTable.getFlow());
             }
-            Sets.SetView<Flow> deletions = Sets.difference(curr, update);
-            Sets.SetView<Flow> additions = Sets.difference(update, curr);
+
+            // Sets with custom equivalence rules
+            Set<Equivalence.Wrapper<Flow>> oldFlows =
+                    new HashSet<>(Collections2.transform(curr, EquivalenceFabric.FLOW_WRAPPER_FUNCTION));
+            Set<Equivalence.Wrapper<Flow>> updatedFlows =
+                    new HashSet<>(Collections2.transform(update, EquivalenceFabric.FLOW_WRAPPER_FUNCTION));
+
+            // what is still there but was not updated, needs to be deleted
+            Sets.SetView<Equivalence.Wrapper<Flow>> deletions =
+                    Sets.difference(oldFlows, updatedFlows);
+            // new flows (they were not there before)
+            Sets.SetView<Equivalence.Wrapper<Flow>> additions =
+                    Sets.difference(updatedFlows, oldFlows);
+
             if (!deletions.isEmpty()) {
-                for (Flow f: deletions) {
-                    t.delete(LogicalDatastoreType.CONFIGURATION,
-                             FlowUtils.createFlowPath(entry.getKey(), f.getId()));
+                for (Equivalence.Wrapper<Flow> wf: deletions) {
+                    Flow f = wf.get();
+                    if (f != null) {
+                        t.delete(LogicalDatastoreType.CONFIGURATION,
+                                FlowUtils.createFlowPath(entry.getKey(), f.getId()));
+                    }
                 }
             }
             if (!additions.isEmpty()) {
-                for (Flow f: additions) {
-                    t.put(LogicalDatastoreType.CONFIGURATION,
-                          FlowUtils.createFlowPath(entry.getKey(), f.getId()), f, true);
+                for (Equivalence.Wrapper<Flow> wf: additions) {
+                    Flow f = wf.get();
+                    if (f != null) {
+                        t.put(LogicalDatastoreType.CONFIGURATION,
+                                FlowUtils.createFlowPath(entry.getKey(), f.getId()), f, true);
+                    }
                 }
             }
             CheckedFuture<Void, TransactionCommitFailedException> f = t.submit();
@@ -314,7 +341,7 @@ public class PolicyManager
             });
         }
 
-     }
+    }
 
     private void scheduleUpdate() {
         if (switchManager != null) {
@@ -363,7 +390,7 @@ public class PolicyManager
             LOG.debug("Beginning flow update task");
 
             CompletionService<Void> ecs
-                = new ExecutorCompletionService<Void>(executor);
+                = new ExecutorCompletionService<>(executor);
             int n = 0;
 
             FlowMap flowMap = new FlowMap();
diff --git a/renderers/ofoverlay/src/main/java/org/opendaylight/groupbasedpolicy/renderer/ofoverlay/equivalence/EquivalenceFabric.java b/renderers/ofoverlay/src/main/java/org/opendaylight/groupbasedpolicy/renderer/ofoverlay/equivalence/EquivalenceFabric.java
new file mode 100755 (executable)
index 0000000..c7e33ef
--- /dev/null
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.groupbasedpolicy.renderer.ofoverlay.equivalence;
+
+import com.google.common.base.Equivalence;
+import com.google.common.base.Function;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
+
+import javax.annotation.Nullable;
+
+/**
+ * A simple fabric for equivalence rules
+ * and for functions used in converting Lists to Sets with our own equivalence rules
+ *
+ */
+public class EquivalenceFabric {
+
+    private EquivalenceFabric(){
+        throw new UnsupportedOperationException("Can not create an instance");
+    }
+
+    public static final FlowEquivalence FLOW_EQUIVALENCE = new FlowEquivalence();
+    public static final Function<Flow, Equivalence.Wrapper<Flow>> FLOW_WRAPPER_FUNCTION =
+            new Function<Flow, Equivalence.Wrapper<Flow>>() {
+                @Nullable
+                @Override
+                public Equivalence.Wrapper<Flow> apply(@Nullable Flow input) {
+                    return FLOW_EQUIVALENCE.wrap(input);
+                }
+            };
+
+    public static final MatchEquivalence MATCH_EQUIVALENCE = new MatchEquivalence();
+
+}
diff --git a/renderers/ofoverlay/src/main/java/org/opendaylight/groupbasedpolicy/renderer/ofoverlay/equivalence/FlowEquivalence.java b/renderers/ofoverlay/src/main/java/org/opendaylight/groupbasedpolicy/renderer/ofoverlay/equivalence/FlowEquivalence.java
new file mode 100755 (executable)
index 0000000..08a3cdd
--- /dev/null
@@ -0,0 +1,132 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.groupbasedpolicy.renderer.ofoverlay.equivalence;
+
+import com.google.common.base.Equivalence;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.instruction.list.Instruction;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Custom Equivalence for Flow
+ *
+ */
+public class FlowEquivalence extends Equivalence<Flow> {
+
+    FlowEquivalence() {
+    }
+
+    @Override
+    protected boolean doEquivalent(Flow a, Flow b) {
+
+        if (!Objects.equals(a.getBufferId(), b.getBufferId())) {
+            return false;
+        }
+        if (!Objects.equals(a.getContainerName(), b.getContainerName())) {
+            return false;
+        }
+        if (!Objects.equals(a.getCookie(), b.getCookie())) {
+            return false;
+        }
+        if (!Objects.equals(a.getCookieMask(), b.getCookieMask())) {
+            return false;
+        }
+        if (!Objects.equals(a.getFlags(), b.getFlags())) {
+            return false;
+        }
+        if (!Objects.equals(a.getFlowName(), b.getFlowName())) {
+            return false;
+        }
+
+        List<Instruction> listA = new ArrayList<>();
+        if (a.getInstructions() != null) {
+            listA = a.getInstructions().getInstruction();
+        }
+        Set<Instruction> setA = new HashSet<>();
+        if (listA != null) {
+            setA = new HashSet<>(listA);
+        }
+        List<Instruction> listB = new ArrayList<>();
+        if (a.getInstructions() != null) {
+            listB = b.getInstructions().getInstruction();
+        }
+        Set<Instruction> setB = new HashSet<>();
+        if (listB != null) {
+            setB = new HashSet<>(listB);
+        }
+        if (!setA.equals(setB)) {
+            return false;
+        }
+
+        if (!EquivalenceFabric.MATCH_EQUIVALENCE
+                .equivalent(a.getMatch(), b.getMatch())) {
+            return false;
+        }
+        if (!Objects.equals(a.getOutGroup(), b.getOutGroup())) {
+            return false;
+        }
+        if (!Objects.equals(a.getOutPort(), b.getOutPort())) {
+            return false;
+        }
+        if (!Objects.equals(a.getPriority(), b.getPriority())) {
+            return false;
+        }
+        if (!Objects.equals(a.getTableId(), b.getTableId())) {
+            return false;
+        }
+        if (!Objects.equals(a.isBarrier(), b.isBarrier())) {
+            return false;
+        }
+        if (!Objects.equals(a.isInstallHw(), b.isInstallHw())) {
+            return false;
+        }
+        if (!Objects.equals(a.isStrict(), b.isStrict())) {
+            return false;
+        }
+
+        return true;
+    }
+
+    @Override
+    protected int doHash(Flow flow) {
+        final int prime = 31;
+        int result = 1;
+
+        result = prime * result + ((flow.getBufferId() == null) ? 0 : flow.getBufferId().hashCode());
+        result = prime * result + ((flow.getContainerName() == null) ? 0 : flow.getContainerName().hashCode());
+        result = prime * result + ((flow.getCookie() == null) ? 0 : flow.getCookie().hashCode());
+        result = prime * result + ((flow.getCookieMask() == null) ? 0 : flow.getCookieMask().hashCode());
+        result = prime * result + ((flow.getFlags() == null) ? 0 : flow.getFlags().hashCode());
+        result = prime * result + ((flow.getFlowName() == null) ? 0 : flow.getFlowName().hashCode());
+
+        if (flow.getInstructions() != null
+                && flow.getInstructions().getInstruction() != null
+                && !flow.getInstructions().getInstruction().isEmpty()) {
+            Set<Instruction> instructions = new HashSet<>(flow.getInstructions().getInstruction());
+            result = prime * result + instructions.hashCode();
+        }
+
+        result = prime * result + ((flow.getMatch() == null) ? 0
+                : EquivalenceFabric.MATCH_EQUIVALENCE.wrap(flow.getMatch()).hashCode());
+        result = prime * result + ((flow.getOutGroup() == null) ? 0 : flow.getOutGroup().hashCode());
+        result = prime * result + ((flow.getOutPort() == null) ? 0 : flow.getOutPort().hashCode());
+        result = prime * result + ((flow.getPriority() == null) ? 0 : flow.getPriority().hashCode());
+        result = prime * result + ((flow.getTableId() == null) ? 0 : flow.getTableId().hashCode());
+        result = prime * result + ((flow.isBarrier() == null) ? 0 : flow.isBarrier().hashCode());
+        result = prime * result + ((flow.isInstallHw() == null) ? 0 : flow.isInstallHw().hashCode());
+        result = prime * result + ((flow.isStrict() == null) ? 0 : flow.isStrict().hashCode());
+
+        return result;
+    }
+}
diff --git a/renderers/ofoverlay/src/main/java/org/opendaylight/groupbasedpolicy/renderer/ofoverlay/equivalence/MatchEquivalence.java b/renderers/ofoverlay/src/main/java/org/opendaylight/groupbasedpolicy/renderer/ofoverlay/equivalence/MatchEquivalence.java
new file mode 100755 (executable)
index 0000000..f75962f
--- /dev/null
@@ -0,0 +1,133 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.groupbasedpolicy.renderer.ofoverlay.equivalence;
+
+import com.google.common.base.Equivalence;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.general.rev140714.GeneralAugMatchNodesNodeTableFlow;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.general.rev140714.general.extension.list.grouping.ExtensionList;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Custom Equivalence for Match
+ *
+ * @see FlowEquivalence
+ */
+public class MatchEquivalence extends Equivalence<Match> {
+
+    MatchEquivalence() {
+    }
+
+    @Override
+    protected boolean doEquivalent(Match a, Match b) {
+
+        if (!Objects.equals(a.getEthernetMatch(), b.getEthernetMatch())) {
+            return false;
+        }
+        if (!Objects.equals(a.getIcmpv4Match(), b.getIcmpv4Match())) {
+            return false;
+        }
+        if (!Objects.equals(a.getIcmpv6Match(), b.getIcmpv6Match())) {
+            return false;
+        }
+        if (!Objects.equals(a.getInPhyPort(), b.getInPhyPort())) {
+            return false;
+        }
+        if (!Objects.equals(a.getInPort(), b.getInPort())) {
+            return false;
+        }
+        if (!Objects.equals(a.getIpMatch(), b.getIpMatch())) {
+            return false;
+        }
+        if (!Objects.equals(a.getLayer3Match(), b.getLayer3Match())) {
+            return false;
+        }
+        if (!Objects.equals(a.getLayer4Match(), b.getLayer4Match())) {
+            return false;
+        }
+        if (!Objects.equals(a.getMetadata(), b.getMetadata())) {
+            return false;
+        }
+        if (!Objects.equals(a.getProtocolMatchFields(), b.getProtocolMatchFields())) {
+            return false;
+        }
+        if (!Objects.equals(a.getTcpFlagMatch(), b.getTcpFlagMatch())) {
+            return false;
+        }
+        if (!Objects.equals(a.getTunnel(), b.getTunnel())) {
+            return false;
+        }
+        if (!Objects.equals(a.getVlanMatch(), b.getVlanMatch())) {
+            return false;
+        }
+        GeneralAugMatchNodesNodeTableFlow generalAugMatchA =
+                a.getAugmentation(GeneralAugMatchNodesNodeTableFlow.class);
+        GeneralAugMatchNodesNodeTableFlow generalAugMatchB =
+                b.getAugmentation(GeneralAugMatchNodesNodeTableFlow.class);
+
+        if (generalAugMatchA != null && generalAugMatchB != null) {
+            // if both have GeneralAugMatchNodesNodeTableFlow augmentation
+            // create sets of ExtentionList type (not a List/Collection at all, as of yet)
+            Set<ExtensionList> setA = new HashSet<>();
+            Set<ExtensionList> setB = new HashSet<>();
+            if (generalAugMatchA.getExtensionList() != null) {
+                setA = new HashSet<>(generalAugMatchA.getExtensionList());
+            }
+            if (generalAugMatchB.getExtensionList() != null) {
+                setB = new HashSet<>(generalAugMatchB.getExtensionList());
+            }
+            if (!setA.equals(setB)) {
+                return false;
+            }
+
+        } else if ((generalAugMatchA == null && generalAugMatchB != null)
+                || generalAugMatchA != null) {
+            // if only one has GeneralAugMatchNodesNodeTableFlow augmentation, they are not equal
+            return false;
+        } // if no-one has GeneralAugMatchNodesNodeTableFlow augmentation, continue matching
+
+        return true;
+    }
+
+    @Override
+    protected int doHash(Match m) {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((m.getEthernetMatch() == null) ? 0 : m.getEthernetMatch().hashCode());
+        result = prime * result + ((m.getIcmpv4Match() == null) ? 0 : m.getIcmpv4Match().hashCode());
+        result = prime * result + ((m.getIcmpv6Match() == null) ? 0 : m.getIcmpv6Match().hashCode());
+        result = prime * result + ((m.getInPhyPort() == null) ? 0 : m.getInPhyPort().hashCode());
+        result = prime * result + ((m.getInPort() == null) ? 0 : m.getInPort().hashCode());
+        result = prime * result + ((m.getIpMatch() == null) ? 0 : m.getIpMatch().hashCode());
+        result = prime * result + ((m.getLayer3Match() == null) ? 0 : m.getLayer3Match().hashCode());
+        result = prime * result + ((m.getLayer4Match() == null) ? 0 : m.getLayer4Match().hashCode());
+        result = prime * result + ((m.getMetadata() == null) ? 0 : m.getMetadata().hashCode());
+        result = prime * result + ((m.getProtocolMatchFields() == null) ? 0 : m.getProtocolMatchFields().hashCode());
+        result = prime * result + ((m.getTcpFlagMatch() == null) ? 0 : m.getTcpFlagMatch().hashCode());
+        result = prime * result + ((m.getTunnel() == null) ? 0 : m.getTunnel().hashCode());
+        result = prime * result + ((m.getVlanMatch() == null) ? 0 : m.getVlanMatch().hashCode());
+
+        GeneralAugMatchNodesNodeTableFlow generalAugMatch =
+                m.getAugmentation(GeneralAugMatchNodesNodeTableFlow.class);
+        if (generalAugMatch != null) {
+            List<ExtensionList> augMatchExtensionList = generalAugMatch.getExtensionList();
+            Set<ExtensionList> extensionListSet = new HashSet<>();
+            if (augMatchExtensionList != null) {
+                extensionListSet =
+                        new HashSet<>(augMatchExtensionList);
+            }
+            result = prime * result + extensionListSet.hashCode();
+        }
+        return result;
+    }
+}