Implement SFC integration
[groupbasedpolicy.git] / renderers / ofoverlay / src / main / java / org / opendaylight / groupbasedpolicy / renderer / ofoverlay / PolicyManager.java
index 21cc3209e4fe2f303c01746aa56c1d3ed8f6252a..284028d0d411b91a693551cde65682adb472ed43 100644 (file)
@@ -8,8 +8,10 @@
 
 package org.opendaylight.groupbasedpolicy.renderer.ofoverlay;
 
-import java.util.Collections;
+import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
@@ -19,32 +21,35 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
 
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
 import org.opendaylight.groupbasedpolicy.endpoint.EpKey;
 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.DestinationMapper;
+import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.FlowUtils;
 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.GroupTable;
 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.OfTable;
-import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.OfTable.OfTableCtx;
 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.PolicyEnforcer;
 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.PortSecurity;
 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.SourceMapper;
+import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.node.SwitchListener;
+import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.sf.Action;
+import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.node.SwitchManager;
 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.sf.SubjectFeatures;
-import org.opendaylight.groupbasedpolicy.resolver.ConditionGroup;
 import org.opendaylight.groupbasedpolicy.resolver.EgKey;
 import org.opendaylight.groupbasedpolicy.resolver.PolicyInfo;
 import org.opendaylight.groupbasedpolicy.resolver.PolicyListener;
 import org.opendaylight.groupbasedpolicy.resolver.PolicyResolver;
 import org.opendaylight.groupbasedpolicy.resolver.PolicyScope;
-import org.opendaylight.groupbasedpolicy.util.SetUtils;
 import org.opendaylight.groupbasedpolicy.util.SingletonTask;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.TenantId;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.UniqueId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.ActionDefinitionId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.ofoverlay.rev140528.OfOverlayConfig.LearningMode;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.SubjectFeatureDefinitions;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
@@ -52,28 +57,32 @@ import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
 
 /**
- * Manage policies on switches by subscribing to updates from the 
- * policy resolver and information about endpoints from the endpoint 
+ * Manage policies on switches by subscribing to updates from the
+ * policy resolver and information about endpoints from the endpoint
  * registry
- * @author readams
  */
-public class PolicyManager 
+public class PolicyManager
      implements SwitchListener, PolicyListener, EndpointListener {
-    private static final Logger LOG = 
+    private static final Logger LOG =
             LoggerFactory.getLogger(PolicyManager.class);
 
     private final SwitchManager switchManager;
     private final PolicyResolver policyResolver;
-    
+
     private final PolicyScope policyScope;
-    
-    private final AtomicReference<Dirty> dirty;
-    
+
     private final ScheduledExecutorService executor;
     private final SingletonTask flowUpdateTask;
+    private final DataBroker dataBroker;
 
     /**
      * The flow tables that make up the processing pipeline
@@ -86,39 +95,24 @@ public class PolicyManager
      */
     private final static int FLOW_UPDATE_DELAY = 250;
 
-    /**
-     * Counter used to allocate ordinal values for forwarding contexts
-     * and VNIDs
-     */
-    private final AtomicInteger policyOrdinal = new AtomicInteger(1);
-    
-    /**
-     * Keep track of currently-allocated ordinals
-     */
-    // XXX For the endpoint groups, we need a globally unique ordinal, so
-    // should ultimately involve some sort of distributed agreement
-    // or a leader to allocate them.  For now we'll just use a counter and
-    // this local map.  Also theoretically need to garbage collect periodically
-    private final ConcurrentMap<String, Integer> ordinals = 
-            new ConcurrentHashMap<>();
-    // XXX - need to garbage collect
-    private final ConcurrentMap<ConditionGroup, Integer> cgOrdinals = 
-            new ConcurrentHashMap<>();
-            
+
+
     public PolicyManager(DataBroker dataBroker,
                          PolicyResolver policyResolver,
                          SwitchManager switchManager,
-                         EndpointManager endpointManager, 
+                         EndpointManager endpointManager,
                          RpcProviderRegistry rpcRegistry,
                          ScheduledExecutorService executor) {
         super();
         this.switchManager = switchManager;
         this.executor = executor;
         this.policyResolver = policyResolver;
+        this.dataBroker = dataBroker;
+
 
         if (dataBroker != null) {
             WriteTransaction t = dataBroker.newWriteOnlyTransaction();
-            t.put(LogicalDatastoreType.OPERATIONAL, 
+            t.put(LogicalDatastoreType.OPERATIONAL,
                   InstanceIdentifier
                       .builder(SubjectFeatureDefinitions.class)
                       .build(),
@@ -126,8 +120,12 @@ public class PolicyManager
             t.submit();
         }
 
-        OfTableCtx ctx = new OfTableCtx(dataBroker, rpcRegistry, 
-                                        this, policyResolver, switchManager, 
+        for(Entry<ActionDefinitionId, Action> entry : SubjectFeatures.getActions().entrySet()) {
+            policyResolver.registerActionDefinitions(entry.getKey(), entry.getValue());
+        }
+
+        OfContext ctx = new OfContext(dataBroker, rpcRegistry,
+                                        this, policyResolver, switchManager,
                                         endpointManager, executor);
         flowPipeline = ImmutableList.of(new PortSecurity(ctx),
                                         new GroupTable(ctx),
@@ -139,12 +137,10 @@ public class PolicyManager
         if (switchManager != null)
             switchManager.registerListener(this);
         endpointManager.registerListener(this);
-        
-        dirty = new AtomicReference<>(new Dirty());
-        
+
         flowUpdateTask = new SingletonTask(executor, new FlowUpdateTask());
         scheduleUpdate();
-        
+
         LOG.debug("Initialized OFOverlay policy manager");
     }
 
@@ -154,65 +150,36 @@ public class PolicyManager
 
     @Override
     public void switchReady(final NodeId nodeId) {
-//        WriteTransaction t = dataBroker.newWriteOnlyTransaction();
-//        
-//        NodeBuilder nb = new NodeBuilder()
-//            .setId(nodeId)
-//            .addAugmentation(FlowCapableNode.class, 
-//                             new FlowCapableNodeBuilder()
-//                                .build());
-//        t.merge(LogicalDatastoreType.CONFIGURATION, 
-//                FlowUtils.createNodePath(nodeId),
-//                nb.build(), true);
-//        ListenableFuture<Void> result = t.submit();
-//        Futures.addCallback(result, 
-//                            new FutureCallback<Void>() {
-//            @Override
-//            public void onSuccess(Void result) {
-//                dirty.get().addNode(nodeId);
-//                scheduleUpdate();
-//            }
-//
-//            @Override
-//            public void onFailure(Throwable t) {
-//                LOG.error("Could not add switch {}", nodeId, t);
-//            }
-//        });
-        
+        scheduleUpdate();
     }
 
     @Override
     public void switchRemoved(NodeId sw) {
         // XXX TODO purge switch flows
-        dirty.get().addNode(sw);
         scheduleUpdate();
     }
-    
+
     @Override
     public void switchUpdated(NodeId sw) {
-        dirty.get().addNode(sw);
         scheduleUpdate();
     }
 
     // ****************
     // EndpointListener
     // ****************
-    
+
     @Override
     public void endpointUpdated(EpKey epKey) {
-        dirty.get().addEndpoint(epKey);
         scheduleUpdate();
     }
 
     @Override
     public void nodeEndpointUpdated(NodeId nodeId, EpKey epKey){
-        dirty.get().addNodeEp(nodeId, epKey);
         scheduleUpdate();
     }
 
     @Override
     public void groupEndpointUpdated(EgKey egKey, EpKey epKey) {
-        dirty.get().addEndpointGroupEp(egKey, epKey);
         policyScope.addToScope(egKey.getTenantId(), egKey.getEgId());
         scheduleUpdate();
     }
@@ -220,12 +187,9 @@ public class PolicyManager
     // **************
     // PolicyListener
     // **************
-    
+
     @Override
     public void policyUpdated(Set<EgKey> updatedConsumers) {
-        for (EgKey key : updatedConsumers) {
-            dirty.get().addEndpointGroup(key);
-        }
         scheduleUpdate();
     }
 
@@ -241,89 +205,148 @@ public class PolicyManager
         // No-op for now
     }
 
-    /**
-     * Get a unique ordinal for the given condition group, suitable for
-     * use in the data plane.  This is unique only for this node, and not 
-     * globally.
-     * @param cg the {@link ConditionGroup}
-     * @return the unique ID
-     */
-    public int getCondGroupOrdinal(final ConditionGroup cg) {
-        if (cg == null) return 0;
-        Integer ord = cgOrdinals.get(cg);
-        if (ord == null) {
-            ord = policyOrdinal.getAndIncrement();
-            Integer old = cgOrdinals.putIfAbsent(cg, ord);
-            if (old != null) ord = old; 
-        }
-        return ord.intValue();
-    }
-    
-    /**
-     * Get a 32-bit context ordinal suitable for use in the OF data plane
-     * for the given policy item. 
-     * @param tenantId the tenant ID of the element
-     * @param id the unique ID for the element
-     * @return the 32-bit ordinal value
-     */
-    public int getContextOrdinal(final TenantId tenantId, 
-                                 final UniqueId id) throws Exception {
-        if (tenantId == null || id == null) return 0;
-        return getContextOrdinal(tenantId.getValue() + "|" + id.getValue());
-    }
 
-    /**
-     * Get a 32-bit context ordinal suitable for use in the OF data plane
-     * for the given policy item.
-     * @param id the unique ID for the element
-     * @return the 32-bit ordinal value
-     */
-    public int getContextOrdinal(final String id) throws Exception {
 
-        Integer ord = ordinals.get(id);
-        if (ord == null) {
-            ord = policyOrdinal.getAndIncrement();
-            Integer old = ordinals.putIfAbsent(id, ord);
-            if (old != null) ord = old;
-        }
-        return ord.intValue();
-    }
-    
     // **************
     // Implementation
     // **************
 
+    public class FlowMap{
+        private ConcurrentMap<InstanceIdentifier<Table>, TableBuilder> flowMap = new ConcurrentHashMap<>();
+
+        public FlowMap() {
+        }
+
+        public TableBuilder getTableForNode(NodeId nodeId, short tableId) {
+            InstanceIdentifier<Table> tableIid = FlowUtils.createTablePath(nodeId, tableId);
+            if(this.flowMap.get(tableIid) == null) {
+                this.flowMap.put(tableIid, new TableBuilder().setId(tableId));
+                this.flowMap.get(tableIid).setFlow(new ArrayList<Flow>());
+            }
+            return this.flowMap.get(tableIid);
+        }
+
+        public void writeFlow(NodeId nodeId,short tableId, Flow flow) {
+            TableBuilder tableBuilder = this.getTableForNode(nodeId, tableId);
+            if (!tableBuilder.getFlow().contains(flow)) {
+                tableBuilder.getFlow().add(Preconditions.checkNotNull(flow));
+            }
+        }
+
+        public void commitToDataStore() {
+            if (dataBroker != null) {
+                for( Entry<InstanceIdentifier<Table>, TableBuilder> entry : flowMap.entrySet()) {
+                    try {
+                        /*
+                         * Get the currently configured flows for
+                         * this table.
+                         */
+                        updateFlowTable(entry);
+                    } catch (Exception e) {
+                        LOG.warn("Couldn't read flow table {}", entry.getKey());
+                    }
+                }
+            }
+        }
+
+        private void updateFlowTable(Entry<InstanceIdentifier<Table>,
+                                     TableBuilder> entry)  throws Exception {
+            Set<Flow> update = new HashSet<Flow>(entry.getValue().getFlow());
+            Set<Flow> curr = new HashSet<Flow>();
+
+            ReadWriteTransaction t = dataBroker.newReadWriteTransaction();
+            Optional<Table> r =
+                   t.read(LogicalDatastoreType.CONFIGURATION, entry.getKey()).get();
+
+            if (r.isPresent()) {
+                Table curTable = r.get();
+                curr = new HashSet<Flow>(curTable.getFlow());
+            }
+            Sets.SetView<Flow> deletions = Sets.difference(curr, update);
+            Sets.SetView<Flow> additions = Sets.difference(update, curr);
+            if (!deletions.isEmpty()) {
+                for (Flow f: deletions) {
+                    t.delete(LogicalDatastoreType.CONFIGURATION,
+                             FlowUtils.createFlowPath(entry.getKey(), f.getId()));
+                }
+            }
+            if (!additions.isEmpty()) {
+                for (Flow f: additions) {
+                    t.put(LogicalDatastoreType.CONFIGURATION,
+                          FlowUtils.createFlowPath(entry.getKey(), f.getId()), f, true);
+                }
+            }
+            CheckedFuture<Void, TransactionCommitFailedException> f = t.submit();
+            Futures.addCallback(f, new FutureCallback<Void>() {
+                @Override
+                public void onFailure(Throwable t) {
+                    LOG.error("Could not write flow table {}", t);
+                }
+
+                @Override
+                public void onSuccess(Void result) {
+                    LOG.debug("Flow table updated.");
+                }
+            });
+        }
+
+        private void purgeFromDataStore() {
+            // TODO: tbachman: Remove for Lithium -- this is a workaround
+            //       where some flow-mods aren't getting installed
+            //       on vSwitches when changing L3 contexts
+            WriteTransaction d = dataBroker.newWriteOnlyTransaction();
+
+            for( Entry<InstanceIdentifier<Table>, TableBuilder> entry : flowMap.entrySet()) {
+                d.delete(LogicalDatastoreType.CONFIGURATION, entry.getKey());
+            }
+
+            CheckedFuture<Void, TransactionCommitFailedException> fu = d.submit();
+            Futures.addCallback(fu, new FutureCallback<Void>() {
+                @Override
+                public void onFailure(Throwable th) {
+                    LOG.error("Could not write flow table.", th);
+                }
+
+                @Override
+                public void onSuccess(Void result) {
+                    LOG.debug("Flow table updated.");
+                }
+            });
+        }
+
+     }
+
     private void scheduleUpdate() {
         if (switchManager != null) {
             LOG.trace("Scheduling flow update task");
             flowUpdateTask.reschedule(FLOW_UPDATE_DELAY, TimeUnit.MILLISECONDS);
         }
     }
-    
+
     /**
      * Update the flows on a particular switch
      */
     private class SwitchFlowUpdateTask implements Callable<Void> {
-        private final Dirty dirty;
-        private final NodeId nodeId;
+        private FlowMap flowMap;
 
-        public SwitchFlowUpdateTask(Dirty dirty, NodeId nodeId) {
+        public SwitchFlowUpdateTask(FlowMap flowMap) {
             super();
-            this.dirty = dirty;
-            this.nodeId = nodeId;
+            this.flowMap = flowMap;
         }
 
         @Override
         public Void call() throws Exception {
-            if (!switchManager.isSwitchReady(nodeId)) return null;
-            PolicyInfo info = policyResolver.getCurrentPolicy();
-            if (info == null) return null;
-            for (OfTable table : flowPipeline) {
-                try {
-                    table.update(nodeId, info, dirty);
-                } catch (Exception e) {
-                    LOG.error("Failed to write flow table {}", 
-                              table.getClass().getSimpleName(), e);
+            for (NodeId node : switchManager.getReadySwitches()) {
+                PolicyInfo info = policyResolver.getCurrentPolicy();
+                if (info == null)
+                    return null;
+                for (OfTable table : flowPipeline) {
+                    try {
+                        table.update(node, info, flowMap);
+                    } catch (Exception e) {
+                        LOG.error("Failed to write flow table {}",
+                                table.getClass().getSimpleName(), e);
+                    }
                 }
             }
             return null;
@@ -333,25 +356,26 @@ public class PolicyManager
     /**
      * Update all flows on all switches as needed.  Note that this will block
      * one of the threads on the executor.
-     * @author readams
      */
     private class FlowUpdateTask implements Runnable {
         @Override
         public void run() {
             LOG.debug("Beginning flow update task");
 
-            Dirty d = dirty.getAndSet(new Dirty());
             CompletionService<Void> ecs
                 = new ExecutorCompletionService<Void>(executor);
             int n = 0;
-            for (NodeId node : switchManager.getReadySwitches()) {
-                SwitchFlowUpdateTask swut = new SwitchFlowUpdateTask(d, node);
-                ecs.submit(swut);
-                n += 1;
-            }
+
+            FlowMap flowMap = new FlowMap();
+
+            SwitchFlowUpdateTask swut = new SwitchFlowUpdateTask(flowMap);
+            ecs.submit(swut);
+            n+=1;
+
             for (int i = 0; i < n; i++) {
                 try {
                     ecs.take().get();
+                    flowMap.commitToDataStore();
                 } catch (InterruptedException | ExecutionException e) {
                     LOG.error("Failed to update flow tables", e);
                 }
@@ -359,65 +383,9 @@ public class PolicyManager
             LOG.debug("Flow update completed");
         }
     }
-    
-    /**
-     * Dirty state since our last successful flow table sync.
-     */
-    public static class Dirty {
-        private Set<EpKey> endpoints;
-        private Set<NodeId> nodes;
-        private Set<EgKey> groups;
-        private ConcurrentMap<EgKey, Set<EpKey>> groupEps;
-        private ConcurrentMap<NodeId, Set<EpKey>> nodeEps;
-        
-        public Dirty() {
-            ConcurrentHashMap<EpKey,Boolean> epmap = new ConcurrentHashMap<>();
-            endpoints = Collections.newSetFromMap(epmap);
-            ConcurrentHashMap<NodeId,Boolean> nomap = new ConcurrentHashMap<>();
-            nodes = Collections.newSetFromMap(nomap);
-            ConcurrentHashMap<EgKey,Boolean> grmap = new ConcurrentHashMap<>();
-            groups = Collections.newSetFromMap(grmap);
-
-            groupEps = new ConcurrentHashMap<>();
-            nodeEps = new ConcurrentHashMap<>();
-        }
-        
-        public void addEndpointGroupEp(EgKey egKey, EpKey epKey) {
-            SetUtils.getNestedSet(egKey, groupEps)
-                .add(epKey);
-        }
-        public void addNodeEp(NodeId id, EpKey epKey) {
-            SetUtils.getNestedSet(id, nodeEps).add(epKey);
-        }
-        public void addNode(NodeId id) {
-            nodes.add(id);
-        }
-        public void addEndpointGroup(EgKey key) {
-            groups.add(key);
-        }
-        public void addEndpoint(EpKey epKey) {
-            endpoints.add(epKey);
-        }
 
-        public Set<EpKey> getEndpoints() {
-            return endpoints;
-        }
 
-        public Set<NodeId> getNodes() {
-            return nodes;
-        }
 
-        public Set<EgKey> getGroups() {
-            return groups;
-        }
 
-        public ConcurrentMap<EgKey, Set<EpKey>> getGroupEps() {
-            return groupEps;
-        }
 
-        public ConcurrentMap<NodeId, Set<EpKey>> getNodeEps() {
-            return nodeEps;
-        }
-        
-    }
 }