GBP coexistence - OF renderer can remove only its own flows from node
[groupbasedpolicy.git] / renderers / ofoverlay / src / main / java / org / opendaylight / groupbasedpolicy / renderer / ofoverlay / PolicyManager.java
index 2e40312724734188c231fee156b3aa48f731c2d3..d0277afa8c9e7a4ee520f2fc9fa07a9d826bca9c 100755 (executable)
@@ -12,7 +12,9 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutionException;
@@ -27,7 +29,6 @@ import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
 import org.opendaylight.groupbasedpolicy.dto.EgKey;
 import org.opendaylight.groupbasedpolicy.dto.EpKey;
 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.endpoint.EndpointManager;
@@ -47,6 +48,7 @@ import org.opendaylight.groupbasedpolicy.util.DataStoreHelper;
 import org.opendaylight.groupbasedpolicy.util.IidFactory;
 import org.opendaylight.groupbasedpolicy.util.SingletonTask;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.ofoverlay.rev140528.OfOverlayConfig.LearningMode;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.renderer.rev151103.renderers.renderer.interests.followed.tenants.followed.tenant.FollowedEndpointGroup;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.renderer.rev151103.renderers.renderer.interests.followed.tenants.followed.tenant.FollowedEndpointGroupBuilder;
@@ -76,6 +78,9 @@ public class PolicyManager
     private static final Logger LOG =
             LoggerFactory.getLogger(PolicyManager.class);
 
+    private Map<InstanceIdentifier<Table>, TableBuilder> actualGbpFlows = new HashMap<>();
+    private Map<InstanceIdentifier<Table>, TableBuilder> previousGbpFlows  = new HashMap<>();
+
     private short tableOffset;
     private static final short TABLEID_PORTSECURITY = 0;
     private static final short TABLEID_INGRESS_NAT =  1;
@@ -103,7 +108,6 @@ public class PolicyManager
     public PolicyManager(DataBroker dataBroker,
                          SwitchManager switchManager,
                          EndpointManager endpointManager,
-                         RpcProviderRegistry rpcRegistry,
                          ScheduledExecutorService executor,
                          short tableOffset) {
         super();
@@ -192,7 +196,7 @@ public class PolicyManager
         for (Short tableId : tableIDs) {
             for (NodeId nodeId : switchManager.getReadySwitches()) {
                 final InstanceIdentifier<Table> tablePath = FlowUtils.createTablePath(nodeId, tableId);
-                checkList.add(deteleTableIfExists(rwTx, tablePath));
+                checkList.add(deleteTableIfExists(rwTx, tablePath));
             }
         }
         ListenableFuture<List<Void>> allAsListFuture = Futures.allAsList(checkList);
@@ -217,7 +221,7 @@ public class PolicyManager
         return tableIds;
     }
 
-    private ListenableFuture<Void> deteleTableIfExists(final ReadWriteTransaction rwTx, final InstanceIdentifier<Table> tablePath){
+    private ListenableFuture<Void> deleteTableIfExists(final ReadWriteTransaction rwTx, final InstanceIdentifier<Table> tablePath){
     return Futures.transform(rwTx.read(LogicalDatastoreType.CONFIGURATION, tablePath), new Function<Optional<Table>, Void>() {
 
         @Override
@@ -345,6 +349,11 @@ public class PolicyManager
     private void scheduleUpdate() {
         if (switchManager != null) {
             LOG.trace("Scheduling flow update task");
+
+            // Mark all existing flows as previous - will be compared with new ones
+            previousGbpFlows = actualGbpFlows;
+            actualGbpFlows = new HashMap<>();
+
             flowUpdateTask.reschedule(FLOW_UPDATE_DELAY, TimeUnit.MILLISECONDS);
         }
     }
@@ -390,21 +399,19 @@ public class PolicyManager
 
             CompletionService<Void> ecs
                 = new ExecutorCompletionService<>(executor);
-            int n = 0;
 
             OfWriter ofWriter = new OfWriter();
 
             SwitchFlowUpdateTask swut = new SwitchFlowUpdateTask(ofWriter);
             ecs.submit(swut);
-            n+=1;
-
-            for (int i = 0; i < n; i++) {
-                try {
-                    ecs.take().get();
-                    ofWriter.commitToDataStore(dataBroker);
-                } catch (InterruptedException | ExecutionException e) {
-                    LOG.error("Failed to update flow tables", e);
-                }
+
+            try {
+                ecs.take().get();
+                // Current gbp flow must be independent, find out where this run() ends,
+                // set flows to one field and reset another
+                actualGbpFlows.putAll(ofWriter.commitToDataStore(dataBroker, previousGbpFlows));
+            } catch (InterruptedException | ExecutionException e) {
+                LOG.error("Failed to update flow tables", e);
             }
             LOG.debug("Flow update completed");
         }