GBP coexistence - OF renderer can remove only its own flows from node
[groupbasedpolicy.git] / renderers / ofoverlay / src / main / java / org / opendaylight / groupbasedpolicy / renderer / ofoverlay / PolicyManager.java
index 5e5b660654aa13a357ff8f8447ff4253bb51d27a..d0277afa8c9e7a4ee520f2fc9fa07a9d826bca9c 100755 (executable)
@@ -8,10 +8,13 @@
 
 package org.opendaylight.groupbasedpolicy.renderer.ofoverlay;
 
+import java.io.Closeable;
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
-import java.util.Map.Entry;
-import java.util.Set;
+import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutionException;
@@ -19,18 +22,16 @@ import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableList;
-import com.google.common.util.concurrent.AsyncFunction;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeListener;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
-import org.opendaylight.groupbasedpolicy.endpoint.EpKey;
+import org.opendaylight.groupbasedpolicy.dto.EgKey;
+import org.opendaylight.groupbasedpolicy.dto.EpKey;
+import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.endpoint.EndpointManager;
 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.DestinationMapper;
 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.EgressNatMapper;
 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.ExternalMapper;
@@ -43,34 +44,43 @@ import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.PortSecurity;
 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.SourceMapper;
 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.node.SwitchListener;
 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.node.SwitchManager;
-import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.sf.Action;
-import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.sf.SubjectFeatures;
-import org.opendaylight.groupbasedpolicy.resolver.EgKey;
-import org.opendaylight.groupbasedpolicy.resolver.PolicyInfo;
-import org.opendaylight.groupbasedpolicy.resolver.PolicyListener;
-import org.opendaylight.groupbasedpolicy.resolver.PolicyResolver;
-import org.opendaylight.groupbasedpolicy.resolver.PolicyScope;
+import org.opendaylight.groupbasedpolicy.util.DataStoreHelper;
+import org.opendaylight.groupbasedpolicy.util.IidFactory;
 import org.opendaylight.groupbasedpolicy.util.SingletonTask;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.ActionDefinitionId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.ofoverlay.rev140528.OfOverlayConfig.LearningMode;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.SubjectFeatureDefinitions;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.renderer.rev151103.renderers.renderer.interests.followed.tenants.followed.tenant.FollowedEndpointGroup;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.renderer.rev151103.renderers.renderer.interests.followed.tenants.followed.tenant.FollowedEndpointGroupBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.resolved.policy.rev150828.ResolvedPolicies;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.resolved.policy.rev150828.resolved.policies.ResolvedPolicy;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
 /**
  * Manage policies on switches by subscribing to updates from the
  * policy resolver and information about endpoints from the endpoint
  * registry
  */
 public class PolicyManager
-     implements SwitchListener, PolicyListener, EndpointListener {
+     implements SwitchListener, EndpointListener, DataTreeChangeListener<ResolvedPolicy>, Closeable {
     private static final Logger LOG =
             LoggerFactory.getLogger(PolicyManager.class);
 
+    private Map<InstanceIdentifier<Table>, TableBuilder> actualGbpFlows = new HashMap<>();
+    private Map<InstanceIdentifier<Table>, TableBuilder> previousGbpFlows  = new HashMap<>();
+
     private short tableOffset;
     private static final short TABLEID_PORTSECURITY = 0;
     private static final short TABLEID_INGRESS_NAT =  1;
@@ -81,18 +91,13 @@ public class PolicyManager
     private static final short TABLEID_EXTERNAL_MAPPER = 6;
 
     private final SwitchManager switchManager;
-    private final PolicyResolver policyResolver;
+    private final EndpointManager endpointManager;
 
-    private final PolicyScope policyScope;
+    private final ListenerRegistration<PolicyManager> registerDataTreeChangeListener;
 
     private final ScheduledExecutorService executor;
     private final SingletonTask flowUpdateTask;
     private final DataBroker dataBroker;
-    private final OfContext ofCtx;
-    /**
-     * The flow tables that make up the processing pipeline
-     */
-    private List<? extends OfTable> flowPipeline;
 
     /**
      * The delay before triggering the flow update task in response to an
@@ -101,16 +106,13 @@ public class PolicyManager
     private final static int FLOW_UPDATE_DELAY = 250;
 
     public PolicyManager(DataBroker dataBroker,
-                         PolicyResolver policyResolver,
                          SwitchManager switchManager,
                          EndpointManager endpointManager,
-                         RpcProviderRegistry rpcRegistry,
                          ScheduledExecutorService executor,
                          short tableOffset) {
         super();
         this.switchManager = switchManager;
         this.executor = executor;
-        this.policyResolver = policyResolver;
         this.dataBroker = dataBroker;
         this.tableOffset = tableOffset;
         try {
@@ -122,28 +124,18 @@ public class PolicyManager
         }
 
         if (dataBroker != null) {
-            WriteTransaction t = dataBroker.newWriteOnlyTransaction();
-            t.put(LogicalDatastoreType.OPERATIONAL,
-                  InstanceIdentifier
-                      .builder(SubjectFeatureDefinitions.class)
-                      .build(),
-                  SubjectFeatures.OF_OVERLAY_FEATURES);
-            t.submit();
+            registerDataTreeChangeListener = dataBroker.registerDataTreeChangeListener(
+                    new DataTreeIdentifier<>(LogicalDatastoreType.OPERATIONAL,
+                            InstanceIdentifier.builder(ResolvedPolicies.class).child(ResolvedPolicy.class).build()),
+                    this);
+        } else {
+            registerDataTreeChangeListener = null;
+            LOG.error("DataBroker is null. Listener for {} was not registered.",
+                    ResolvedPolicy.class.getCanonicalName());
         }
-
-        for(Entry<ActionDefinitionId, Action> entry : SubjectFeatures.getActions().entrySet()) {
-            policyResolver.registerActionDefinitions(entry.getKey(), entry.getValue());
-        }
-
-        ofCtx = new OfContext(dataBroker, rpcRegistry,
-                                        this, policyResolver, switchManager,
-                                        endpointManager, executor);
-
-        flowPipeline = createFlowPipeline();
-
-        policyScope = policyResolver.registerListener(this);
         if (switchManager != null)
             switchManager.registerListener(this);
+        this.endpointManager = endpointManager;
         endpointManager.registerListener(this);
 
         flowUpdateTask = new SingletonTask(executor, new FlowUpdateTask());
@@ -152,7 +144,7 @@ public class PolicyManager
         LOG.debug("Initialized OFOverlay policy manager");
     }
 
-    private List<? extends OfTable> createFlowPipeline() {
+    private List<? extends OfTable> createFlowPipeline(OfContext ofCtx) {
         // TODO - PORTSECURITY is kept in table 0.
         // According to openflow spec,processing on vSwitch always starts from table 0.
         // Packets will be droped if table 0 is empty.
@@ -169,8 +161,8 @@ public class PolicyManager
     }
 
     /**
-     * @param tableOffset - new offset value
-     * @return ListenableFuture<List> - to indicate that tables have been synced
+     * @param tableOffset the new offset value
+     * @return {@link ListenableFuture} to indicate that tables have been synced
      */
     public ListenableFuture<Void> changeOpenFlowTableOffset(final short tableOffset) {
         try {
@@ -188,7 +180,6 @@ public class PolicyManager
 
             @Override
             public Void apply(Void tablesRemoved) {
-                flowPipeline = createFlowPipeline();
                 scheduleUpdate();
                 return null;
             }
@@ -205,7 +196,7 @@ public class PolicyManager
         for (Short tableId : tableIDs) {
             for (NodeId nodeId : switchManager.getReadySwitches()) {
                 final InstanceIdentifier<Table> tablePath = FlowUtils.createTablePath(nodeId, tableId);
-                checkList.add(deteleTableIfExists(rwTx, tablePath));
+                checkList.add(deleteTableIfExists(rwTx, tablePath));
             }
         }
         ListenableFuture<List<Void>> allAsListFuture = Futures.allAsList(checkList);
@@ -230,7 +221,7 @@ public class PolicyManager
         return tableIds;
     }
 
-    private ListenableFuture<Void> deteleTableIfExists(final ReadWriteTransaction rwTx, final InstanceIdentifier<Table> tablePath){
+    private ListenableFuture<Void> deleteTableIfExists(final ReadWriteTransaction rwTx, final InstanceIdentifier<Table> tablePath){
     return Futures.transform(rwTx.read(LogicalDatastoreType.CONFIGURATION, tablePath), new Function<Optional<Table>, Void>() {
 
         @Override
@@ -317,16 +308,25 @@ public class PolicyManager
 
     @Override
     public void groupEndpointUpdated(EgKey egKey, EpKey epKey) {
-        policyScope.addToScope(egKey.getTenantId(), egKey.getEgId());
+        // TODO a renderer should remove followed-EPG and followed-tenant at some point
+        if (dataBroker == null) {
+            LOG.error("DataBroker is null. Cannot write followed-epg {}", epKey);
+            return;
+        }
+        WriteTransaction wTx = dataBroker.newWriteOnlyTransaction();
+        FollowedEndpointGroup followedEpg = new FollowedEndpointGroupBuilder().setId(egKey.getEgId()).build();
+        wTx.put(LogicalDatastoreType.OPERATIONAL, IidFactory.followedEndpointgroupIid(OFOverlayRenderer.RENDERER_NAME,
+                egKey.getTenantId(), egKey.getEgId()), followedEpg, true);
+        DataStoreHelper.submitToDs(wTx);
         scheduleUpdate();
     }
 
     // **************
-    // PolicyListener
+    // DataTreeChangeListener<ResolvedPolicy>
     // **************
 
     @Override
-    public void policyUpdated(Set<EgKey> updatedConsumers) {
+    public void onDataTreeChanged(Collection<DataTreeModification<ResolvedPolicy>> changes) {
         scheduleUpdate();
     }
 
@@ -349,6 +349,11 @@ public class PolicyManager
     private void scheduleUpdate() {
         if (switchManager != null) {
             LOG.trace("Scheduling flow update task");
+
+            // Mark all existing flows as previous - will be compared with new ones
+            previousGbpFlows = actualGbpFlows;
+            actualGbpFlows = new HashMap<>();
+
             flowUpdateTask.reschedule(FLOW_UPDATE_DELAY, TimeUnit.MILLISECONDS);
         }
     }
@@ -357,22 +362,22 @@ public class PolicyManager
      * Update the flows on a particular switch
      */
     private class SwitchFlowUpdateTask implements Callable<Void> {
-        private OfWriter ofWriter;
+        private final OfWriter ofWriter;
 
         public SwitchFlowUpdateTask(OfWriter ofWriter) {
-            super();
             this.ofWriter = ofWriter;
         }
 
         @Override
         public Void call() throws Exception {
+            OfContext ofCtx = new OfContext(dataBroker, PolicyManager.this, switchManager, endpointManager, executor);
+            if (ofCtx.getCurrentPolicy() == null)
+                return null;
+            List<? extends OfTable> flowPipeline = createFlowPipeline(ofCtx);
             for (NodeId node : switchManager.getReadySwitches()) {
-                PolicyInfo info = policyResolver.getCurrentPolicy();
-                if (info == null)
-                    return null;
                 for (OfTable table : flowPipeline) {
                     try {
-                        table.update(node, info, ofWriter);
+                        table.sync(node, ofWriter);
                     } catch (Exception e) {
                         LOG.error("Failed to write Openflow table {}",
                                 table.getClass().getSimpleName(), e);
@@ -394,28 +399,29 @@ public class PolicyManager
 
             CompletionService<Void> ecs
                 = new ExecutorCompletionService<>(executor);
-            int n = 0;
 
             OfWriter ofWriter = new OfWriter();
 
             SwitchFlowUpdateTask swut = new SwitchFlowUpdateTask(ofWriter);
             ecs.submit(swut);
-            n+=1;
-
-            for (int i = 0; i < n; i++) {
-                try {
-                    ecs.take().get();
-                    ofWriter.commitToDataStore(dataBroker);
-                } catch (InterruptedException | ExecutionException e) {
-                    LOG.error("Failed to update flow tables", e);
-                }
+
+            try {
+                ecs.take().get();
+                // Current gbp flow must be independent, find out where this run() ends,
+                // set flows to one field and reset another
+                actualGbpFlows.putAll(ofWriter.commitToDataStore(dataBroker, previousGbpFlows));
+            } catch (InterruptedException | ExecutionException e) {
+                LOG.error("Failed to update flow tables", e);
             }
             LOG.debug("Flow update completed");
         }
     }
 
-
-
-
+    @Override
+    public void close() throws IOException {
+        if (registerDataTreeChangeListener != null)
+            registerDataTreeChangeListener.close();
+        // TODO unregister classifier and action instance validators
+    }
 
 }