Bug 3302: fix for GroupTable
[groupbasedpolicy.git] / renderers / ofoverlay / src / main / java / org / opendaylight / groupbasedpolicy / renderer / ofoverlay / PolicyManager.java
index 0e0a1d4e236ea7a99991a03f1314a006d0a33a1e..5e5b660654aa13a357ff8f8447ff4253bb51d27a 100755 (executable)
@@ -9,27 +9,28 @@
 package org.opendaylight.groupbasedpolicy.renderer.ofoverlay;
 
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
 import org.opendaylight.groupbasedpolicy.endpoint.EpKey;
-import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.equivalence.EquivalenceFabric;
 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.DestinationMapper;
 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.EgressNatMapper;
 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.ExternalMapper;
@@ -51,8 +52,6 @@ import org.opendaylight.groupbasedpolicy.resolver.PolicyResolver;
 import org.opendaylight.groupbasedpolicy.resolver.PolicyScope;
 import org.opendaylight.groupbasedpolicy.util.SingletonTask;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.ActionDefinitionId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.ofoverlay.rev140528.OfOverlayConfig.LearningMode;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.SubjectFeatureDefinitions;
@@ -62,19 +61,6 @@ import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Equivalence;
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.AsyncFunction;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-
 /**
  * Manage policies on switches by subscribing to updates from the
  * policy resolver and information about endpoints from the endpoint
@@ -360,116 +346,6 @@ public class PolicyManager
     // Implementation
     // **************
 
-    public class FlowMap{
-        private ConcurrentMap<InstanceIdentifier<Table>, TableBuilder> flowMap = new ConcurrentHashMap<>();
-
-        public FlowMap() {
-        }
-
-        public TableBuilder getTableForNode(NodeId nodeId, short tableId) {
-            InstanceIdentifier<Table> tableIid = FlowUtils.createTablePath(nodeId, tableId);
-            if(this.flowMap.get(tableIid) == null) {
-                this.flowMap.put(tableIid, new TableBuilder().setId(tableId));
-                this.flowMap.get(tableIid).setFlow(new ArrayList<Flow>());
-            }
-            return this.flowMap.get(tableIid);
-        }
-
-        public void writeFlow(NodeId nodeId, short tableId, Flow flow) {
-            TableBuilder tableBuilder = this.getTableForNode(nodeId, tableId);
-            // transforming List<Flow> to Set (with customized equals/hashCode) to eliminate duplicate entries
-            List<Flow> flows = tableBuilder.getFlow();
-            Set<Equivalence.Wrapper<Flow>> wrappedFlows =
-                    new HashSet<>(Collections2.transform(flows, EquivalenceFabric.FLOW_WRAPPER_FUNCTION));
-
-            Equivalence.Wrapper<Flow> wFlow = EquivalenceFabric.FLOW_EQUIVALENCE.wrap(flow);
-
-            if (!wrappedFlows.contains(wFlow)) {
-                tableBuilder.getFlow().add(Preconditions.checkNotNull(flow));
-            } else {
-                LOG.debug("Flow already exists in FlowMap - {}", flow);
-            }
-        }
-
-        public void commitToDataStore() {
-            if (dataBroker != null) {
-                for( Entry<InstanceIdentifier<Table>, TableBuilder> entry : flowMap.entrySet()) {
-                    try {
-                        /*
-                         * Get the currently configured flows for
-                         * this table.
-                         */
-                        updateFlowTable(entry);
-                    } catch (Exception e) {
-                        LOG.warn("Couldn't read flow table {}", entry.getKey());
-                    }
-                }
-            }
-        }
-
-        private void updateFlowTable(Entry<InstanceIdentifier<Table>,
-                                     TableBuilder> entry)  throws Exception {
-            // flows to update
-            Set<Flow> update = new HashSet<>(entry.getValue().getFlow());
-            // flows currently in the table
-            Set<Flow> curr = new HashSet<>();
-
-            ReadWriteTransaction t = dataBroker.newReadWriteTransaction();
-            Optional<Table> r =
-                   t.read(LogicalDatastoreType.CONFIGURATION, entry.getKey()).get();
-
-            if (r.isPresent()) {
-                Table currentTable = r.get();
-                curr = new HashSet<>(currentTable.getFlow());
-            }
-
-            // Sets with custom equivalence rules
-            Set<Equivalence.Wrapper<Flow>> oldFlows =
-                    new HashSet<>(Collections2.transform(curr, EquivalenceFabric.FLOW_WRAPPER_FUNCTION));
-            Set<Equivalence.Wrapper<Flow>> updatedFlows =
-                    new HashSet<>(Collections2.transform(update, EquivalenceFabric.FLOW_WRAPPER_FUNCTION));
-
-            // what is still there but was not updated, needs to be deleted
-            Sets.SetView<Equivalence.Wrapper<Flow>> deletions =
-                    Sets.difference(oldFlows, updatedFlows);
-            // new flows (they were not there before)
-            Sets.SetView<Equivalence.Wrapper<Flow>> additions =
-                    Sets.difference(updatedFlows, oldFlows);
-
-            if (!deletions.isEmpty()) {
-                for (Equivalence.Wrapper<Flow> wf: deletions) {
-                    Flow f = wf.get();
-                    if (f != null) {
-                        t.delete(LogicalDatastoreType.CONFIGURATION,
-                                FlowUtils.createFlowPath(entry.getKey(), f.getId()));
-                    }
-                }
-            }
-            if (!additions.isEmpty()) {
-                for (Equivalence.Wrapper<Flow> wf: additions) {
-                    Flow f = wf.get();
-                    if (f != null) {
-                        t.put(LogicalDatastoreType.CONFIGURATION,
-                                FlowUtils.createFlowPath(entry.getKey(), f.getId()), f, true);
-                    }
-                }
-            }
-            CheckedFuture<Void, TransactionCommitFailedException> f = t.submit();
-            Futures.addCallback(f, new FutureCallback<Void>() {
-                @Override
-                public void onFailure(Throwable t) {
-                    LOG.error("Could not write flow table {}", t);
-                }
-
-                @Override
-                public void onSuccess(Void result) {
-                    LOG.debug("Flow table updated.");
-                }
-            });
-        }
-
-    }
-
     private void scheduleUpdate() {
         if (switchManager != null) {
             LOG.trace("Scheduling flow update task");
@@ -481,11 +357,11 @@ public class PolicyManager
      * Update the flows on a particular switch
      */
     private class SwitchFlowUpdateTask implements Callable<Void> {
-        private FlowMap flowMap;
+        private OfWriter ofWriter;
 
-        public SwitchFlowUpdateTask(FlowMap flowMap) {
+        public SwitchFlowUpdateTask(OfWriter ofWriter) {
             super();
-            this.flowMap = flowMap;
+            this.ofWriter = ofWriter;
         }
 
         @Override
@@ -496,9 +372,9 @@ public class PolicyManager
                     return null;
                 for (OfTable table : flowPipeline) {
                     try {
-                        table.update(node, info, flowMap);
+                        table.update(node, info, ofWriter);
                     } catch (Exception e) {
-                        LOG.error("Failed to write flow table {}",
+                        LOG.error("Failed to write Openflow table {}",
                                 table.getClass().getSimpleName(), e);
                     }
                 }
@@ -520,16 +396,16 @@ public class PolicyManager
                 = new ExecutorCompletionService<>(executor);
             int n = 0;
 
-            FlowMap flowMap = new FlowMap();
+            OfWriter ofWriter = new OfWriter();
 
-            SwitchFlowUpdateTask swut = new SwitchFlowUpdateTask(flowMap);
+            SwitchFlowUpdateTask swut = new SwitchFlowUpdateTask(ofWriter);
             ecs.submit(swut);
             n+=1;
 
             for (int i = 0; i < n; i++) {
                 try {
                     ecs.take().get();
-                    flowMap.commitToDataStore();
+                    ofWriter.commitToDataStore(dataBroker);
                 } catch (InterruptedException | ExecutionException e) {
                     LOG.error("Failed to update flow tables", e);
                 }