Write openflow flows to datastore in a single transaction per RSP 48/39648/3
authorDiego Granados <[email protected]>
Fri, 13 May 2016 10:52:18 +0000 (12:52 +0200)
committerDiego Granados <[email protected]>
Tue, 14 Jun 2016 08:09:19 +0000 (10:09 +0200)
- Much faster (3x faster on RSP creation; 4x faster on deletion)
- Transactionality will guarantee flow coherence even in the
  event of errors during writeup / deletion
- Initial work by Brady Johnson <[email protected]>

Change-Id: I49ba138bcb13cd43566968ece6e318bb0417d542
Signed-off-by: Diego Granados <[email protected]>
sfc-renderers/sfc-openflow-renderer/src/main/java/org/opendaylight/sfc/ofrenderer/SfcOfRspProcessor.java
sfc-renderers/sfc-openflow-renderer/src/main/java/org/opendaylight/sfc/ofrenderer/openflow/SfcOfFlowProgrammerImpl.java
sfc-renderers/sfc-openflow-renderer/src/main/java/org/opendaylight/sfc/ofrenderer/openflow/SfcOfFlowProgrammerInterface.java
sfc-renderers/sfc-openflow-renderer/src/main/java/org/opendaylight/sfc/ofrenderer/openflow/SfcOfFlowWriterImpl.java
sfc-renderers/sfc-openflow-renderer/src/main/java/org/opendaylight/sfc/ofrenderer/openflow/SfcOfFlowWriterInterface.java
sfc-renderers/sfc-openflow-renderer/src/test/java/org/opendaylight/sfc/ofrenderer/SfcOfFlowProgrammerTest.java
sfc-renderers/sfc-openflow-renderer/src/test/java/org/opendaylight/sfc/ofrenderer/SfcOfRspProcessorTest.java
sfc-util/sfc-openflow-utils/src/main/java/org/opendaylight/sfc/util/openflow/SfcOpenflowUtils.java

index c0337ba7bda68e3711c5ed0d2fb9e82699fb9d14..baccc498ab8f2d1534603f38052a0b8ce7adca99 100644 (file)
@@ -74,7 +74,6 @@ public class SfcOfRspProcessor {
             // Populate the SFF Connection Graph
             //
             SffGraph sffGraph = populateSffGraph(rsp);
-
             SfcRspTransportProcessorBase transportProcessor = getTransportProcessor(sffGraph, rsp);
 
             //
@@ -107,11 +106,16 @@ public class SfcOfRspProcessor {
                 configureTransportEgressFlows(entry, sffGraph, transportProcessor);
             }
 
+            // Flush the flows to the data store
+            this.sfcOfFlowProgrammer.flushFlows();
+
             LOG.info("Processing complete for RSP: name [{}] Id [{}]", rsp.getName(), rsp.getPathId());
 
         } catch (RuntimeException e) {
             LOG.error("RuntimeException in processRenderedServicePath: ", e.getMessage(), e);
         } finally {
+            // If there were any errors, purge any remaining flows so they're not written
+            this.sfcOfFlowProgrammer.purgeFlows();
             sfcSynchronizer.unlock();
             sfcOfProviderUtils.removeRsp(rsp.getPathId());
         }
@@ -123,7 +127,7 @@ public class SfcOfRspProcessor {
      * @param rsp - the Rendered Service Path to delete
      */
     public void deleteRenderedServicePath(RenderedServicePath rsp) {
-        Set<NodeId> clearedSffNodeIDs = sfcOfFlowProgrammer.deleteRspFlowsAndClearSFFsIfNoRspExists(rsp.getPathId());
+        Set<NodeId> clearedSffNodeIDs = sfcOfFlowProgrammer.deleteRspFlows(rsp.getPathId());
         for(NodeId sffNodeId : clearedSffNodeIDs){
             setSffInitialized(sffNodeId, false);
         }
index 495ad99b891155cc695586211e8ecf0c6cdd4fc2..749d30743a961d5d05d81d17fd3058e8eca29887 100644 (file)
@@ -151,9 +151,21 @@ public class SfcOfFlowProgrammerImpl implements SfcOfFlowProgrammerInterface {
     }
 
     @Override
-    public Set<NodeId> deleteRspFlowsAndClearSFFsIfNoRspExists(final Long rspId) {
+    public Set<NodeId> deleteRspFlows(final Long rspId) {
         sfcOfFlowWriter.deleteRspFlows(rspId);
-        return sfcOfFlowWriter.clearSffsIfNoRspExists();
+        Set<NodeId> nodes = sfcOfFlowWriter.clearSffsIfNoRspExists();
+        sfcOfFlowWriter.deleteFlowSet();
+        return nodes;
+    }
+
+    @Override
+    public void flushFlows() {
+        this.sfcOfFlowWriter.flushFlows();
+    }
+
+    @Override
+    public void purgeFlows() {
+        this.sfcOfFlowWriter.purgeFlows();
     }
 
     /**
@@ -197,7 +209,7 @@ public class SfcOfFlowProgrammerImpl implements SfcOfFlowProgrammerInterface {
                 configureTableMatchAnyFlow(
                         getTableId(TABLE_INDEX_CLASSIFIER),
                         getTableId(TABLE_INDEX_TRANSPORT_INGRESS));
-        sfcOfFlowWriter.writeFlowToConfig(flowRspId, sffNodeName, flowBuilder);
+        sfcOfFlowWriter.writeFlow(flowRspId, sffNodeName, flowBuilder);
     }
 
     /**
@@ -215,7 +227,7 @@ public class SfcOfFlowProgrammerImpl implements SfcOfFlowProgrammerInterface {
         FlowBuilder flowBuilder =
                 configureTableMatchAnyDropFlow(
                         getTableId(TABLE_INDEX_TRANSPORT_INGRESS));
-        sfcOfFlowWriter.writeFlowToConfig(flowRspId, sffNodeName, flowBuilder);
+        sfcOfFlowWriter.writeFlow(flowRspId, sffNodeName, flowBuilder);
     }
 
     /**
@@ -230,7 +242,7 @@ public class SfcOfFlowProgrammerImpl implements SfcOfFlowProgrammerInterface {
                 configureTableMatchAnyFlow(
                         getTableId(TABLE_INDEX_PATH_MAPPER),
                         getTableId(TABLE_INDEX_PATH_MAPPER_ACL));
-        sfcOfFlowWriter.writeFlowToConfig(flowRspId, sffNodeName, flowBuilder);
+        sfcOfFlowWriter.writeFlow(flowRspId, sffNodeName, flowBuilder);
     }
 
     /**
@@ -245,7 +257,7 @@ public class SfcOfFlowProgrammerImpl implements SfcOfFlowProgrammerInterface {
                 configureTableMatchAnyFlow(
                         getTableId(TABLE_INDEX_PATH_MAPPER_ACL),
                         getTableId(TABLE_INDEX_NEXT_HOP));
-        sfcOfFlowWriter.writeFlowToConfig(flowRspId, sffNodeName, flowBuilder);
+        sfcOfFlowWriter.writeFlow(flowRspId, sffNodeName, flowBuilder);
     }
 
     /**
@@ -260,7 +272,7 @@ public class SfcOfFlowProgrammerImpl implements SfcOfFlowProgrammerInterface {
                 configureTableMatchAnyFlow(
                         getTableId(TABLE_INDEX_NEXT_HOP),
                         getTableId(TABLE_INDEX_TRANSPORT_EGRESS));
-        sfcOfFlowWriter.writeFlowToConfig(flowRspId, sffNodeName, flowBuilder);
+        sfcOfFlowWriter.writeFlow(flowRspId, sffNodeName, flowBuilder);
     }
 
     /**
@@ -274,7 +286,7 @@ public class SfcOfFlowProgrammerImpl implements SfcOfFlowProgrammerInterface {
         FlowBuilder flowBuilder =
                 configureTableMatchAnyDropFlow(
                         getTableId(TABLE_INDEX_TRANSPORT_EGRESS));
-        sfcOfFlowWriter.writeFlowToConfig(flowRspId, sffNodeName, flowBuilder);
+        sfcOfFlowWriter.writeFlow(flowRspId, sffNodeName, flowBuilder);
     }
 
     /**
@@ -368,13 +380,13 @@ public class SfcOfFlowProgrammerImpl implements SfcOfFlowProgrammerInterface {
                 configureTransportIngressFlow(
                         SfcOpenflowUtils.ETHERTYPE_IPV4,
                         SfcOpenflowUtils.IP_PROTOCOL_TCP);
-        sfcOfFlowWriter.writeFlowToConfig(flowRspId, sffNodeName, transportIngressFlowTcp);
+        sfcOfFlowWriter.writeFlow(flowRspId, sffNodeName, transportIngressFlowTcp);
 
         FlowBuilder transportIngressFlowUdp =
                 configureTransportIngressFlow(
                         SfcOpenflowUtils.ETHERTYPE_IPV4,
                         SfcOpenflowUtils.IP_PROTOCOL_UDP);
-        sfcOfFlowWriter.writeFlowToConfig(flowRspId, sffNodeName, transportIngressFlowUdp);
+        sfcOfFlowWriter.writeFlow(flowRspId, sffNodeName, transportIngressFlowUdp);
     }
 
     /**
@@ -395,7 +407,7 @@ public class SfcOfFlowProgrammerImpl implements SfcOfFlowProgrammerInterface {
         match.setVlanMatch(vlanBuilder.build());
 
         FlowBuilder transportIngressFlow = configureTransportIngressFlow(match);
-        sfcOfFlowWriter.writeFlowToConfig(flowRspId, sffNodeName, transportIngressFlow);
+        sfcOfFlowWriter.writeFlow(flowRspId, sffNodeName, transportIngressFlow);
     }
 
     /**
@@ -410,7 +422,7 @@ public class SfcOfFlowProgrammerImpl implements SfcOfFlowProgrammerInterface {
 
         FlowBuilder transportIngressFlow =
                 configureTransportIngressFlow(match, getTableId(TABLE_INDEX_NEXT_HOP));
-        sfcOfFlowWriter.writeFlowToConfig(flowRspId, sffNodeName, transportIngressFlow);
+        sfcOfFlowWriter.writeFlow(flowRspId, sffNodeName, transportIngressFlow);
     }
 
     /**
@@ -422,7 +434,7 @@ public class SfcOfFlowProgrammerImpl implements SfcOfFlowProgrammerInterface {
     public void configureMplsTransportIngressFlow(final String sffNodeName) {
         FlowBuilder transportIngressFlow =
                 configureTransportIngressFlow(SfcOpenflowUtils.ETHERTYPE_MPLS_UCAST);
-        sfcOfFlowWriter.writeFlowToConfig(flowRspId, sffNodeName, transportIngressFlow);
+        sfcOfFlowWriter.writeFlow(flowRspId, sffNodeName, transportIngressFlow);
     }
 
     /**
@@ -541,7 +553,7 @@ public class SfcOfFlowProgrammerImpl implements SfcOfFlowProgrammerInterface {
                         "ingress_Transport_Arp_Flow",
                         match, isb);
 
-        sfcOfFlowWriter.writeFlowToConfig(flowRspId, sffNodeName, arpTransportIngressFlow);
+        sfcOfFlowWriter.writeFlow(flowRspId, sffNodeName, arpTransportIngressFlow);
     }
 
     @Override
@@ -578,7 +590,7 @@ public class SfcOfFlowProgrammerImpl implements SfcOfFlowProgrammerInterface {
                         "ingress_Transport_Arp_Flow",
                         match, isb);
 
-        sfcOfFlowWriter.writeFlowToConfig(flowRspId, sffNodeName, sfFlow);
+        sfcOfFlowWriter.writeFlow(flowRspId, sffNodeName, sfFlow);
     }
 
     @Override
@@ -613,7 +625,7 @@ public class SfcOfFlowProgrammerImpl implements SfcOfFlowProgrammerInterface {
                         "ingress_Transport_Arp_Flow",
                         match, isb);
 
-        sfcOfFlowWriter.writeFlowToConfig(flowRspId, sffNodeName, sfFlow);
+        sfcOfFlowWriter.writeFlow(flowRspId, sffNodeName, sfFlow);
     }
 
 
@@ -644,7 +656,7 @@ public class SfcOfFlowProgrammerImpl implements SfcOfFlowProgrammerInterface {
         } else {
             pathMapperFlow = configurePathMapperFlow(pathId, match, actionList);
         }
-        sfcOfFlowWriter.writeFlowToConfig(flowRspId, sffNodeName, pathMapperFlow);
+        sfcOfFlowWriter.writeFlow(flowRspId, sffNodeName, pathMapperFlow);
     }
 
     /**
@@ -670,7 +682,7 @@ public class SfcOfFlowProgrammerImpl implements SfcOfFlowProgrammerInterface {
         } else {
             pathMapperFlow = configurePathMapperFlow(pathId, match, actionList);
         }
-        sfcOfFlowWriter.writeFlowToConfig(flowRspId, sffNodeName, pathMapperFlow);
+        sfcOfFlowWriter.writeFlow(flowRspId, sffNodeName, pathMapperFlow);
     }
 
     /**
@@ -811,7 +823,7 @@ public class SfcOfFlowProgrammerImpl implements SfcOfFlowProgrammerInterface {
         // Set an idle timeout on this flow
         ingressFlow.setIdleTimeout(PKTIN_IDLE_TIMEOUT);
 
-        sfcOfFlowWriter.writeFlowToConfig(flowRspId, sffNodeName, ingressFlow);
+        sfcOfFlowWriter.writeFlow(flowRspId, sffNodeName, ingressFlow);
     }
 
     //
@@ -848,7 +860,7 @@ public class SfcOfFlowProgrammerImpl implements SfcOfFlowProgrammerInterface {
         }
 
         FlowBuilder nextHopFlow = configureNextHopFlow(match, actionList, flowPriority);
-        sfcOfFlowWriter.writeFlowToConfig(flowRspId, sffNodeName, nextHopFlow);
+        sfcOfFlowWriter.writeFlow(flowRspId, sffNodeName, nextHopFlow);
     }
 
     /**
@@ -875,7 +887,7 @@ public class SfcOfFlowProgrammerImpl implements SfcOfFlowProgrammerInterface {
         }
 
         FlowBuilder nextHopFlow = configureNextHopFlow(match, actionList);
-        sfcOfFlowWriter.writeFlowToConfig(flowRspId, sffNodeName, nextHopFlow);
+        sfcOfFlowWriter.writeFlow(flowRspId, sffNodeName, nextHopFlow);
     }
 
     /**
@@ -961,7 +973,7 @@ public class SfcOfFlowProgrammerImpl implements SfcOfFlowProgrammerInterface {
 
         FlowBuilder transportEgressFlow =
                 configureTransportEgressFlow(match, actionList, port, order, pathId, srcMac, dstMac);
-        sfcOfFlowWriter.writeFlowToConfig(flowRspId, sffNodeName, transportEgressFlow);
+        sfcOfFlowWriter.writeFlow(flowRspId, sffNodeName, transportEgressFlow);
     }
 
     /**
@@ -1015,7 +1027,7 @@ public class SfcOfFlowProgrammerImpl implements SfcOfFlowProgrammerInterface {
 
         FlowBuilder transportEgressFlow =
                 configureTransportEgressFlow(match, actionList, port, order, pathId, srcMac, dstMac);
-        sfcOfFlowWriter.writeFlowToConfig(flowRspId, sffNodeName, transportEgressFlow);
+        sfcOfFlowWriter.writeFlow(flowRspId, sffNodeName, transportEgressFlow);
     }
 
     /**
@@ -1069,7 +1081,7 @@ public class SfcOfFlowProgrammerImpl implements SfcOfFlowProgrammerInterface {
 
         FlowBuilder transportEgressFlow =
                 configureTransportEgressFlow(match, actionList, port, order, pathId, srcMac, dstMac);
-        sfcOfFlowWriter.writeFlowToConfig(flowRspId, sffNodeName, transportEgressFlow);
+        sfcOfFlowWriter.writeFlow(flowRspId, sffNodeName, transportEgressFlow);
     }
 
     /**
@@ -1098,7 +1110,7 @@ public class SfcOfFlowProgrammerImpl implements SfcOfFlowProgrammerInterface {
 
         FlowBuilder transportEgressFlow =
                 configureTransportEgressFlow(match, actionList, port, order);
-        sfcOfFlowWriter.writeFlowToConfig(flowRspId, sffNodeName, transportEgressFlow);
+        sfcOfFlowWriter.writeFlow(flowRspId, sffNodeName, transportEgressFlow);
     }
 
     @Override
@@ -1118,7 +1130,7 @@ public class SfcOfFlowProgrammerImpl implements SfcOfFlowProgrammerInterface {
 
         FlowBuilder transportEgressFlow =
                 configureTransportEgressFlow(match, actionList, port, order);
-        sfcOfFlowWriter.writeFlowToConfig(flowRspId, sffNodeName, transportEgressFlow);
+        sfcOfFlowWriter.writeFlow(flowRspId, sffNodeName, transportEgressFlow);
     }
 
     /**
@@ -1148,7 +1160,7 @@ public class SfcOfFlowProgrammerImpl implements SfcOfFlowProgrammerInterface {
                 configureTransportEgressFlow(
                         match, new ArrayList<Action>(), port,
                         order, FLOW_PRIORITY_TRANSPORT_EGRESS + 10);
-        sfcOfFlowWriter.writeFlowToConfig(flowRspId, sffNodeName, transportEgressFlow);
+        sfcOfFlowWriter.writeFlow(flowRspId, sffNodeName, transportEgressFlow);
     }
 
     @Override
@@ -1183,7 +1195,7 @@ public class SfcOfFlowProgrammerImpl implements SfcOfFlowProgrammerInterface {
                 configureTransportEgressFlow(
                         match, actionList, EMPTY_SWITCH_PORT,
                         order, FLOW_PRIORITY_TRANSPORT_EGRESS + 10);
-        sfcOfFlowWriter.writeFlowToConfig(flowRspId, sffNodeName, transportEgressFlow);
+        sfcOfFlowWriter.writeFlow(flowRspId, sffNodeName, transportEgressFlow);
     }
 
     /**
@@ -1400,7 +1412,7 @@ public class SfcOfFlowProgrammerImpl implements SfcOfFlowProgrammerInterface {
                 SfcOpenflowUtils.createFlowBuilder(
                         getTableId(TABLE_INDEX_NEXT_HOP),
                         flowPriority, "nextHop", match, isb);
-        sfcOfFlowWriter.writeFlowToConfig(flowRspId, sffNodeName, nextHopFlow);
+        sfcOfFlowWriter.writeFlow(flowRspId, sffNodeName, nextHopFlow);
     }
 
     private static BigInteger getMetadataSFP(long sfpId) {
index a2c02317f3af3ee665108e9dddd3ccb7262b2957..aca5a325bbc33412264aecfccba96c8efa664e6a 100644 (file)
@@ -49,7 +49,14 @@ public interface SfcOfFlowProgrammerInterface {
      *
      * @return Node IDs from which initialization flows were removed.
      */
-    public Set<NodeId> deleteRspFlowsAndClearSFFsIfNoRspExists(final Long rspId);
+    public Set<NodeId> deleteRspFlows(final Long rspId);
+
+    // Write any buffered flows to the data store
+    public void flushFlows();
+
+    // Purge any unwritten flows not written yet. This should be called upon
+    // errors, when the remaining buffered flows should not be written.
+    public void purgeFlows();
 
     //Set FlowWriter implementation
     public void setFlowWriter(SfcOfFlowWriterInterface sfcOfFlowWriter);
index 3d3f2fa463f99774d993c78d395565056ad41109..b1a5ba4d75c2b8db5fa6bba10feb34aec35ab1bd 100644 (file)
@@ -17,11 +17,13 @@ import java.util.Set;
 import java.util.Map.Entry;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
+import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.sfc.provider.OpendaylightSfc;
 import org.opendaylight.sfc.provider.api.SfcDataStoreAPI;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
@@ -41,38 +43,51 @@ import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.util.concurrent.CheckedFuture;
+
 /**
  * Set of instructions in order to interact with MD-SAL datastore.
  * <p>
  *
  * @author Brady Johnson ([email protected])
  * @author Ricardo Noriega ([email protected])
+ * @author Diego Granados ([email protected])
  * @since 2015-11-25
  */
 
 public class SfcOfFlowWriterImpl implements SfcOfFlowWriterInterface {
-    private static final int SCHEDULED_THREAD_POOL_SIZE = 1;
-    private static final int QUEUE_SIZE = 1000;
-    private static final int ASYNC_THREAD_POOL_KEEP_ALIVE_TIME_SECS = 300;
     private static final long SHUTDOWN_TIME = 5;
-    private static final String LOGSTR_THREAD_QUEUE_FULL = "Thread Queue is full, cant execute action: {}";
+    private static final String LOGSTR_THREAD_EXCEPTION = "Exception executing Thread: {}";
     private static final Logger LOG = LoggerFactory.getLogger(SfcOfFlowWriterImpl.class);
 
+    //private ExecutorService threadPoolExecutorServiceDelete;
     private ExecutorService threadPoolExecutorService;
+
     private FlowBuilder flowBuilder;
+    // Store RspId to List of FlowDetails, to be able
+    // to delete all flows for a particular RSP
     private Map<Long, List<FlowDetails>> rspNameToFlowsMap;
 
+    //temporary list of flows to be deleted. All of them will be transactionally deleted on
+    // deleteFlowSet() invokation
+    private Set<FlowDetails> setOfFlowsToDelete;
+    // temporary list of flows to be deleted. All of them will be transactionally deleted on
+    // flushFlows() invokation
+    private Set<FlowDetails> setOfFlowsToAdd;
+
     public SfcOfFlowWriterImpl() {
-        // Not using an Executors.newSingleThreadExecutor() here, since it creates
-        // an Executor that uses a single worker thread operating off an unbounded
-        // queue, and we want to be able to limit the size of the queue
-        this.threadPoolExecutorService = new ThreadPoolExecutor(SCHEDULED_THREAD_POOL_SIZE, SCHEDULED_THREAD_POOL_SIZE,
-                ASYNC_THREAD_POOL_KEEP_ALIVE_TIME_SECS, TimeUnit.SECONDS,
-                new LinkedBlockingQueue<Runnable>(QUEUE_SIZE));
+
+        this.threadPoolExecutorService = Executors.newSingleThreadExecutor();;
         this.rspNameToFlowsMap = new HashMap<Long, List<FlowDetails>>();
         this.flowBuilder = null;
+        this.setOfFlowsToDelete = new HashSet<FlowDetails>();
+        this.setOfFlowsToAdd = new HashSet<FlowDetails>();
     }
 
+    /**
+     * Shutdown the thread pool
+     */
+    @Override
     public void shutdown() throws ExecutionException, InterruptedException {
         // When we close this service we need to shutdown our executor!
         threadPoolExecutorService.shutdown();
@@ -85,128 +100,180 @@ public class SfcOfFlowWriterImpl implements SfcOfFlowWriterInterface {
     }
 
     /**
-     * A thread class used to write the flows to the data store.
+     * A thread class used to write the flows to the data store. It receives the list of flows to create at creation time.
+     * The flows are written together in a single data store transaction
      */
-    class FlowWriterTask implements Runnable {
-        String sffNodeName;
-        InstanceIdentifier<Flow> flowInstanceId;
-        FlowBuilder flowBuilder;
+    class FlowSetWriterTask implements Runnable {
+        Set<FlowDetails> flowsToWrite = new HashSet<FlowDetails>();
 
-        public FlowWriterTask(String sffNodeName, InstanceIdentifier<Flow> flowInstanceId, FlowBuilder flowBuilder) {
-            this.sffNodeName = sffNodeName;
-            this.flowInstanceId = flowInstanceId;
-            this.flowBuilder = flowBuilder;
+        public FlowSetWriterTask(Set<FlowDetails> flowsToWrite) {
+            this.flowsToWrite.addAll(flowsToWrite);
         }
 
         public void run(){
-            if (!SfcDataStoreAPI.writeMergeTransactionAPI(
-                    this.flowInstanceId,
-                    this.flowBuilder.build(),
-                    LogicalDatastoreType.CONFIGURATION)) {
-                LOG.error("{}: Failed to create Flow on node: {}", Thread.currentThread().getStackTrace()[1], this.sffNodeName);
+            WriteTransaction trans = OpendaylightSfc.getOpendaylightSfcObj().getDataProvider().newWriteOnlyTransaction();
+
+            LOG.debug("FlowSetWriterTask: starting addition of {} flows", flowsToWrite.size());
+
+            for (FlowDetails f: flowsToWrite) {
+
+                NodeBuilder nodeBuilder = new NodeBuilder();
+                nodeBuilder.setId(new NodeId(f.sffNodeName));
+                nodeBuilder.setKey(new NodeKey(nodeBuilder.getId()));
+
+                InstanceIdentifier<Flow> iidFlow = InstanceIdentifier.builder(Nodes.class)
+                            .child(Node.class, nodeBuilder.getKey())
+                            .augmentation(FlowCapableNode.class)
+                            .child(Table.class, f.tableKey)
+                            .child(Flow.class, f.flowKey)
+                            .build();
+
+                // No need to read previously existing flows. Merge will take care of that
+                trans.merge(LogicalDatastoreType.CONFIGURATION, iidFlow, f.flow, true);
+            }
+
+            CheckedFuture<Void, TransactionCommitFailedException> submitFuture = trans.submit();
+
+            try {
+                submitFuture.checkedGet();
+            } catch (TransactionCommitFailedException e) {
+                LOG.error("deleteTransactionAPI: Transaction failed. Message: {}", e.getMessage());
             }
         }
     }
 
     /**
-     * A thread class used to remove flows from the data store.
+     * A thread class used to transactionally delete a set of flows belonging to a given RSP in a single transaction
      */
-    class FlowRemoverTask implements Runnable {
-        String sffNodeName;
-        InstanceIdentifier<Flow> flowInstanceId;
+    class FlowSetRemoverTask implements Runnable {
 
-        public FlowRemoverTask(String sffNodeName, InstanceIdentifier<Flow> flowInstanceId) {
-            this.flowInstanceId = flowInstanceId;
-            this.sffNodeName = sffNodeName;
+        Set<FlowDetails> flowsToDelete = new HashSet<FlowDetails>();
+
+        public FlowSetRemoverTask(Set<FlowDetails> flowsToDelete) {
+            this.flowsToDelete.addAll(flowsToDelete);
         }
 
         public void run(){
-            if (!SfcDataStoreAPI.deleteTransactionAPI(flowInstanceId, LogicalDatastoreType.CONFIGURATION)) {
-                LOG.error("{}: Failed to remove Flow on node: {}", Thread.currentThread().getStackTrace()[1], sffNodeName);
+
+            WriteTransaction writeTx = OpendaylightSfc.getOpendaylightSfcObj().getDataProvider().newWriteOnlyTransaction();
+
+            LOG.debug("FlowSetRemoverTask: starting deletion of {} flows", flowsToDelete.size());
+
+            for (FlowDetails f: flowsToDelete) {
+
+                NodeBuilder nodeBuilder = new NodeBuilder();
+                nodeBuilder.setId(new NodeId(f.sffNodeName));
+                nodeBuilder.setKey(new NodeKey(nodeBuilder.getId()));
+
+                InstanceIdentifier<Flow> iidFlow = InstanceIdentifier.builder(Nodes.class)
+                            .child(Node.class, nodeBuilder.getKey())
+                            .augmentation(FlowCapableNode.class)
+                            .child(Table.class, f.tableKey)
+                            .child(Flow.class, f.flowKey)
+                            .build();
+
+                writeTx.delete(LogicalDatastoreType.CONFIGURATION, iidFlow);
+            }
+
+            CheckedFuture<Void, TransactionCommitFailedException> submitFuture = writeTx.submit();
+            try {
+                submitFuture.checkedGet();
+            } catch (TransactionCommitFailedException e) {
+                LOG.error("deleteTransactionAPI: Transaction failed. Message: {}", e.getMessage());
             }
         }
     }
 
     /**
-     * Internal class used to store the details of a flow for easy deletion later
+     * Internal class used to store the details of a flow for easy creation / deletion later
      */
-    private class FlowDetails {
+    public class FlowDetails {
 
         public String sffNodeName;
         public FlowKey flowKey;
         public TableKey tableKey;
+        public Flow flow;
 
-        public FlowDetails(final String sffNodeName, FlowKey flowKey, TableKey tableKey) {
+        /**
+         * This constructor is used for storing flows to be added
+         */
+        public FlowDetails(final String sffNodeName, FlowKey flowKey, TableKey tableKey, Flow flow) {
             this.sffNodeName = sffNodeName;
             this.flowKey = flowKey;
             this.tableKey = tableKey;
+            this.flow = flow;
+        }
+
+        /**
+         * This constructor is used for storing flows to be deleted. Only the path ids are needed
+         */
+        public FlowDetails(final String sffNodeName, FlowKey flowKey, TableKey tableKey) {
+            this(sffNodeName, flowKey, tableKey, null);
         }
     }
 
     /**
-     * Write a flow to the DataStore
+     * Store a flow to be written later. The flows will be stored per
+     * SFF and table. Later, when flushFlows() is called, all the flows
+     * will be written. The tableId is taken from the FlowBuilder.
      *
      * @param sffNodeName - which SFF to write the flow to
      * @param flow - details of the flow to be written
      */
     @Override
-    public void writeFlowToConfig(Long rspId, String sffNodeName,
-            FlowBuilder flow) {
+    public void writeFlow(Long rspId, String sffNodeName, FlowBuilder flow) {
+        this.flowBuilder = flow;
 
-        // Create the NodeBuilder
-        NodeBuilder nodeBuilder = new NodeBuilder();
-        nodeBuilder.setId(new NodeId(sffNodeName));
-        nodeBuilder.setKey(new NodeKey(nodeBuilder.getId()));
-
-        // Create the flow path, which will include the Node, Table, and Flow
-        InstanceIdentifier<Flow> flowInstanceId = InstanceIdentifier.builder(Nodes.class)
-            .child(Node.class, nodeBuilder.getKey())
-            .augmentation(FlowCapableNode.class)
-            .child(Table.class, new TableKey(flow.getTableId()))
-            .child(Flow.class, flow.getKey())
-            .build();
+        LOG.debug("writeFlow storing flow to Node {}, table {}", sffNodeName, flow.getTableId());
 
-        LOG.debug("writeFlowToConfig writing flow to Node {}, table {}", sffNodeName, flow.getTableId());
+        // Add the flow to the set of flows to be added in a single transaction
+        setOfFlowsToAdd.add(new FlowDetails(sffNodeName, flow.getKey(), new TableKey(flow.getTableId()), flowBuilder.build()));
 
+        // This will store the flow info and rspId for removal later
         storeFlowDetails(rspId, sffNodeName, flow.getKey(), flow.getTableId());
+    }
 
-        FlowWriterTask writerThread = new FlowWriterTask(sffNodeName, flowInstanceId, flow);
-        try {
-            threadPoolExecutorService.execute(writerThread);
-        } catch (Exception ex) {
-            LOG.error(LOGSTR_THREAD_QUEUE_FULL, ex.toString());
-        }
+    @Override
+    public void removeFlow(String sffNodeName, FlowKey flowKey,
+            TableKey tableKey) {
+
+      LOG.debug("removeFlow: removing flow with key {} from table {} in sff {}", flowKey, tableKey, sffNodeName);
+
+      FlowDetails flowDetail = new FlowDetails(sffNodeName, flowKey, tableKey);
+      setOfFlowsToDelete.add(flowDetail);
     }
 
     /**
-     * Remove a Flow from the DataStore
-     *
-     * @param sffNodeName - which SFF the flow is in
-     * @param flowKey - The flow key of the flow to be removed
-     * @param tableKey - The table the flow was written to
+     * From previous calls to writeFlowToConfig(), flows were stored per table
+     * and per SFF. Now the flows will be written, one table at at time per SFF.
      */
     @Override
-    public void removeFlowFromConfig(String sffNodeName, FlowKey flowKey,
-            TableKey tableKey) {
+    public void flushFlows() {
 
-        NodeBuilder nodeBuilder = new NodeBuilder();
-        nodeBuilder.setId(new NodeId(sffNodeName));
-        nodeBuilder.setKey(new NodeKey(nodeBuilder.getId()));
+        LOG.info("flushFlows: creating flowWriter task, writing [{}] flows.",
+                setOfFlowsToAdd.size());
 
-        // Create the flow path
-        InstanceIdentifier<Flow> flowInstanceId = InstanceIdentifier.builder(Nodes.class)
-            .child(Node.class, nodeBuilder.getKey())
-            .augmentation(FlowCapableNode.class)
-            .child(Table.class, tableKey)
-            .child(Flow.class, flowKey)
-            .build();
+        FlowSetWriterTask writerThread = new FlowSetWriterTask(setOfFlowsToAdd);
 
-        FlowRemoverTask removerThread = new FlowRemoverTask(sffNodeName, flowInstanceId);
         try {
-            threadPoolExecutorService.execute(removerThread);
+            threadPoolExecutorService.execute(writerThread);
         } catch (Exception ex) {
-            LOG.error(LOGSTR_THREAD_QUEUE_FULL, ex.toString());
+            LOG.error(LOGSTR_THREAD_EXCEPTION, ex.toString());
         }
+
+        // Clear the entries
+        setOfFlowsToAdd.clear();
+
+    }
+
+    /**
+     * Purge any unwritten flows not written-deleted yet. This should be called upon
+     * errors, when the remaining buffered flows should not be persisted
+     */
+    @Override
+    public void purgeFlows() {
+        setOfFlowsToAdd.clear();
+        setOfFlowsToDelete.clear();
     }
 
     /**
@@ -217,8 +284,7 @@ public class SfcOfFlowWriterImpl implements SfcOfFlowWriterInterface {
      * @param flowKey - the flow key of the new flow
      * @param tableId - the table the flow was written to
      */
-    @Override
-    public void storeFlowDetails(final Long rspId, final String sffNodeName, FlowKey flowKey, short tableId) {
+    private void storeFlowDetails(final Long rspId, final String sffNodeName, FlowKey flowKey, short tableId) {
         List<FlowDetails> flowDetails = rspNameToFlowsMap.get(rspId);
         if (flowDetails == null) {
             flowDetails = new ArrayList<FlowDetails>();
@@ -255,13 +321,18 @@ public class SfcOfFlowWriterImpl implements SfcOfFlowWriterInterface {
         }
     }
 
+    /**
+     * Return the last flow builder
+     * Used mainly in Unit Testing
+     */
     @Override
     public FlowBuilder getFlowBuilder() {
         return this.flowBuilder;
     }
 
     /**
-     * Delete all flows created for the given rspId
+     * Delete all flows created for the given rspId (flows are stored in a deletion buffer;
+     * actual transactional deletion is performed upon deleteFlowSet() invokation
      *
      * @param rspId - the rspId to delete flows for
      */
@@ -274,9 +345,23 @@ public class SfcOfFlowWriterImpl implements SfcOfFlowWriterInterface {
         }
 
         rspNameToFlowsMap.remove(rspId);
-        for (FlowDetails flowDetails : flowDetailsList) {
-            removeFlowFromConfig(flowDetails.sffNodeName, flowDetails.flowKey, flowDetails.tableKey);
-        }
+        setOfFlowsToDelete.addAll(flowDetailsList);
+    }
+
+    @Override
+    public void deleteFlowSet() {
+
+        LOG.info("deleteFlowSet: deleting {} flows", setOfFlowsToDelete.size());
+        FlowSetRemoverTask fsrt = new FlowSetRemoverTask(setOfFlowsToDelete);
+            try {
+                threadPoolExecutorService.execute(fsrt);
+            } catch (Exception ex) {
+                LOG.error(LOGSTR_THREAD_EXCEPTION, ex.toString());
+            }
+
+        // Clear the entries
+        setOfFlowsToDelete.clear();
+
     }
 
     @Override
@@ -286,14 +371,16 @@ public class SfcOfFlowWriterImpl implements SfcOfFlowWriterInterface {
         // RSPs, which can be deleted.
         Set<NodeId> sffNodeIDs = new HashSet<>();
         if (rspNameToFlowsMap.size() == 1) {
+            LOG.debug("clearSffIfNoRspExists:only one rsp - deleting all remaining flows");
             Set<Entry<Long, List<FlowDetails>>> entries = rspNameToFlowsMap.entrySet();
             List<FlowDetails> flowDetailsList = entries.iterator().next().getValue();
             for (FlowDetails flowDetails : flowDetailsList) {
-                removeFlowFromConfig(flowDetails.sffNodeName, flowDetails.flowKey, flowDetails.tableKey);
+                setOfFlowsToDelete.add(flowDetails);
                 sffNodeIDs.add(new NodeId(flowDetails.sffNodeName));
             }
             rspNameToFlowsMap.clear();
         }
         return sffNodeIDs;
     }
+
 }
index a87553103865e28465fc2e28f65c4d462f4df225..6311c35fc25a8a4aa2149d3fecf51a3565b5f094 100644 (file)
@@ -29,13 +29,10 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
 public interface SfcOfFlowWriterInterface {
 
     //Write flows to MD-SAL datastore
-    public void writeFlowToConfig(final Long rspId, final String sffNodeName, FlowBuilder flow);
+    public void writeFlow(final Long rspId, final String sffNodeName, FlowBuilder flow);
 
     //Remove flows from MD-SAL datastore
-    public void removeFlowFromConfig(final String sffNodeName, FlowKey flowKey, TableKey tableKey);
-
-    //Store the flow details so it is easier to remove later
-    public void storeFlowDetails(final Long rspId, final String sffNodeName, FlowKey flowKey, short tableId);
+    public void removeFlow(final String sffNodeName, FlowKey flowKey, TableKey tableKey);
 
     //Write group to MD-SAL datastore
     public void writeGroupToDataStore(String sffNodeName, GroupBuilder gb, boolean isAdd);
@@ -55,9 +52,18 @@ public interface SfcOfFlowWriterInterface {
      */
     public Set<NodeId> clearSffsIfNoRspExists();
 
-    //Get flow
+    // Get the most recent Flow Builder
     public FlowBuilder getFlowBuilder();
 
+    // Flush any flows that havent been written to the data store yet
+    public void flushFlows();
+
+    // Performs the deletion of any flows that havent been deleted from the data store yet
+    public void deleteFlowSet();
+
+    // Purge any flows that havent been written/deleted to/from the data store yet
+    public void purgeFlows();
+
     // If the impl uses threads, shut it down
     public void shutdown() throws ExecutionException, InterruptedException;
 
index 484b6d3f0c2c4e7f006b2406205a824253dbcf14..0ab661c4b9fe9ad7fa7192ee53150b45576429ed 100644 (file)
@@ -8,9 +8,9 @@
 
 package org.opendaylight.sfc.ofrenderer;
 
-import static org.mockito.Mockito.anyLong;
-import static org.mockito.Mockito.anyString;
-import static org.mockito.Mockito.anyObject;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.anyObject;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import org.mockito.stubbing.Answer;
@@ -111,7 +111,7 @@ public class SfcOfFlowProgrammerTest {
                 flowBuilder = (FlowBuilder) args[2];
                 return null;
             }
-        }).when(this.sfcOfFlowWriter).writeFlowToConfig(anyLong(), anyString(), (FlowBuilder) anyObject());
+        }).when(this.sfcOfFlowWriter).writeFlow(anyLong(), anyString(), (FlowBuilder) anyObject());
 
         // Configure Mockito to return the FlowBuilder stored by writeFlowToConfig()
         // when getFlowBuilder() is called
index 6d942b318b2327a49d7f60b5da088c4b42889c46..e34a5fe646cc27bc32e9db964bf1e041adc340d1 100644 (file)
@@ -150,6 +150,11 @@ public class SfcOfRspProcessorTest {
                 "SFF_0", "00:00:00:00:00:02", "00:00:00:00:00:00", 2, "1", 0, false);
         verify(this.flowProgrammerTestMoc, times(1)).configureVlanSfTransportEgressFlow(
                 "SFF_1", "00:00:00:00:00:07", "00:00:00:00:00:05", 3, "1", 0, false);
+
+        // verify flow flushing
+        verify(this.flowProgrammerTestMoc).flushFlows();
+        verify(this.flowProgrammerTestMoc).purgeFlows();
+
         verifyNoMoreInteractions(this.flowProgrammerTestMoc);
     }
 
@@ -205,6 +210,10 @@ public class SfcOfRspProcessorTest {
         verify(this.flowProgrammerTestMoc, times(1)).configureMplsLastHopTransportEgressFlow(eq("SFF_1"), anyString(),
                 anyString(), anyLong(), anyString(), anyLong());
 
+        // verify flow flushing
+        verify(this.flowProgrammerTestMoc).flushFlows();
+        verify(this.flowProgrammerTestMoc).purgeFlows();
+
         verifyNoMoreInteractions(this.flowProgrammerTestMoc);
 
     }
@@ -247,6 +256,10 @@ public class SfcOfRspProcessorTest {
         verify(this.flowProgrammerTestMoc).configureVxlanGpeAppCoexistTransportEgressFlow(
                 eq("SFF_1"), anyLong(), anyShort(), anyString());
 
+        // verify flow flushing
+        verify(this.flowProgrammerTestMoc).flushFlows();
+        verify(this.flowProgrammerTestMoc).purgeFlows();
+
         verifyNoMoreInteractions(this.flowProgrammerTestMoc);
     }
 
@@ -285,6 +298,10 @@ public class SfcOfRspProcessorTest {
         verify(this.flowProgrammerTestMoc).configureVxlanGpeAppCoexistTransportEgressFlow(
                 "SFF_0", 0, (short) 254, "192.168.0.2");
 
+        // verify flow flushing
+        verify(this.flowProgrammerTestMoc).flushFlows();
+        verify(this.flowProgrammerTestMoc).purgeFlows();
+
         verifyNoMoreInteractions(this.flowProgrammerTestMoc);
     }
 
@@ -341,6 +358,10 @@ public class SfcOfRspProcessorTest {
         verify(this.flowProgrammerTestMoc, times(1)).configureVlanSfTransportEgressFlow(
                 eq("SFF_1"), anyString(), anyString(), anyInt(), anyString(), anyLong(), eq(true));
 
+        // verify flow flushing
+        verify(this.flowProgrammerTestMoc).flushFlows();
+        verify(this.flowProgrammerTestMoc).purgeFlows();
+
         verifyNoMoreInteractions(this.flowProgrammerTestMoc);
 
     }
index 4b1b57a67f16ff068fe5136c461a250ea78503cb..30973cdf8a22668ba4b93d39fffd65b501cd87a1 100644 (file)
@@ -1260,4 +1260,53 @@ public class SfcOpenflowUtils {
         }
         return false;
     }
+
+    /**
+     * Creates an Instance Identifier (path) for node with specified id
+     *
+     * @param nodeId the ID of the node
+     * @return the {@link InstanceIdentifier}
+     */
+    public static final InstanceIdentifier<Node> createNodePath(final NodeId nodeId) {
+        return InstanceIdentifier.builder(Nodes.class).child(Node.class, new NodeKey(nodeId)).build();
+    }
+
+    /**
+     * Creates a table path from a node ID and table ID
+     *
+     * @param nodeId the ID of the node
+     * @param tableId the ID of the table
+     * @return the {@link InstanceIdentifier}
+     */
+    public static final InstanceIdentifier<Table> createTablePath(final NodeId nodeId, final short tableId) {
+        return createNodePath(nodeId).builder()
+            .augmentation(FlowCapableNode.class)
+            .child(Table.class, new TableKey(tableId))
+            .build();
+    }
+
+    /**
+     * Creates a path for particular flow, by appending flow-specific information
+     * to table path.
+     *
+     * @param table the table iid
+     * @param flowKey the flow key
+     * @return the {@link InstanceIdentifier}
+     */
+    public static InstanceIdentifier<Flow> createFlowPath(final InstanceIdentifier<Table> table, final FlowKey flowKey) {
+        return table.child(Flow.class, flowKey);
+    }
+
+    /**
+     * Creates a path for particular flow, by appending flow-specific information
+     * to table path.
+     *
+     * @param table the table iid
+     * @param flowId the flow id
+     * @return the {@link InstanceIdentifier}
+     */
+    public static InstanceIdentifier<Flow> createFlowPath(final InstanceIdentifier<Table> table, final FlowId flowId) {
+        return createFlowPath(table, new FlowKey(flowId));
+    }
+
 }