Switch to MD-SAL APIs
[openflowplugin.git] / applications / bulk-o-matic / src / main / java / org / opendaylight / openflowplugin / applications / bulk / o / matic / FlowWriterConcurrent.java
index c26d1bcee46ec6daf6a3f1553af4d0b3d8b5c160..ae73a6248c50511a17d2db11cc2c08246b261557 100644 (file)
@@ -8,69 +8,59 @@
 package org.opendaylight.openflowplugin.applications.bulk.o.matic;
 
 import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.binding.api.WriteTransaction;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
 public class FlowWriterConcurrent implements FlowCounterMBean {
     private static final Logger LOG = LoggerFactory.getLogger(FlowWriterConcurrent.class);
+    public static final String USING_CONCURRENT_IMPLEMENTATION_OF_FLOW_WRITER =
+            "Using Concurrent implementation of Flow Writer.";
     private final DataBroker dataBroker;
     private final ExecutorService flowPusher;
     private long startTime;
-    private AtomicInteger writeOpStatus = new AtomicInteger(FlowCounter.OperationStatus.INIT.status());
-    private AtomicInteger countDpnWriteCompletion = new AtomicInteger(0);
-    private AtomicLong taskCompletionTime = new AtomicLong(0);
-    private final String UNITS = "ns";
+    private final AtomicInteger writeOpStatus = new AtomicInteger(FlowCounter.OperationStatus.INIT.status());
+    private final AtomicInteger countDpnWriteCompletion = new AtomicInteger();
+    private final AtomicLong taskCompletionTime = new AtomicLong();
 
     public FlowWriterConcurrent(final DataBroker dataBroker, ExecutorService flowPusher) {
         this.dataBroker = dataBroker;
         this.flowPusher = flowPusher;
-        LOG.info("Using Concurrent implementation of Flow Writer.");
+        LOG.info(USING_CONCURRENT_IMPLEMENTATION_OF_FLOW_WRITER);
     }
 
-    public void addFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize,
-                         int sleepMillis, int sleepAfter, short startTableId, short endTableId) {
-        LOG.info("Using Concurrent implementation of Flow Writer.");
+    public void addFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize, int sleepMillis, int sleepAfter,
+            short startTableId, short endTableId, boolean isCreateParents) {
+        LOG.info(USING_CONCURRENT_IMPLEMENTATION_OF_FLOW_WRITER);
         countDpnWriteCompletion.set(dpnCount);
         startTime = System.nanoTime();
         for (int i = 1; i <= dpnCount; i++) {
-            FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i),
-                    flowsPerDPN, true, batchSize, sleepMillis, sleepAfter, startTableId, endTableId);
+            FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, true, batchSize, sleepMillis,
+                    sleepAfter, startTableId, endTableId, isCreateParents);
             flowPusher.execute(task);
         }
     }
 
-    public void deleteFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize,
-                            short startTableId, short endTableId) {
-        LOG.info("Using Concurrent implementation of Flow Writer.");
+    public void deleteFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize, short startTableId,
+            short endTableId) {
+        LOG.info(USING_CONCURRENT_IMPLEMENTATION_OF_FLOW_WRITER);
         countDpnWriteCompletion.set(dpnCount);
         for (int i = 1; i <= dpnCount; i++) {
-            FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, false, batchSize,
-                    0, 1, startTableId, endTableId);
+            FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, false, batchSize, 0, 1,
+                    startTableId, endTableId, false);
             flowPusher.execute(task);
         }
     }
 
-    @Override
-    public long getFlowCount() {
-        return BulkOMaticUtils.DEFAULT_FLOW_COUNT;
-    }
-
-    @Override
-    public int getReadOpStatus() {
-        return BulkOMaticUtils.DEFUALT_STATUS;
-    }
-
     @Override
     public int getWriteOpStatus() {
         return writeOpStatus.get();
@@ -81,11 +71,6 @@ public class FlowWriterConcurrent implements FlowCounterMBean {
         return taskCompletionTime.get();
     }
 
-    @Override
-    public String getUnits() {
-        return UNITS;
-    }
-
     private class FlowHandlerTask implements Runnable {
         private final String dpId;
         private final boolean add;
@@ -95,16 +80,14 @@ public class FlowWriterConcurrent implements FlowCounterMBean {
         private final int sleepMillis;
         private final short startTableId;
         private final short endTableId;
-        private AtomicInteger remainingTxReturn = new AtomicInteger(0);
-
-        public FlowHandlerTask(final String dpId,
-                               final int flowsPerDpn,
-                               final boolean add,
-                               final int batchSize,
-                               final int sleepMillis,
-                               final int sleepAfter,
-                               final short startTableId,
-                               final short endTableId){
+        private final AtomicInteger remainingTxReturn = new AtomicInteger(0);
+        private final boolean isCreateParents;
+
+        FlowHandlerTask(final String dpId, final int flowsPerDpn,
+                final boolean add, final int batchSize,
+                final int sleepMillis, final int sleepAfter,
+                final short startTableId, final short endTableId,
+                final boolean isCreateParents) {
             this.dpId = BulkOMaticUtils.DEVICE_TYPE_PREFIX + dpId;
             this.add = add;
             this.flowsPerDpn = flowsPerDpn;
@@ -113,43 +96,47 @@ public class FlowWriterConcurrent implements FlowCounterMBean {
             this.sleepAfter = sleepAfter;
             this.startTableId = startTableId;
             this.endTableId = endTableId;
-            remainingTxReturn.set(flowsPerDpn/batchSize);
+            this.isCreateParents = isCreateParents;
+            remainingTxReturn.set(flowsPerDpn / batchSize);
         }
 
         @Override
         public void run() {
-            LOG.info("Starting flow writer task for dpid: {}. Number of transactions: {}", dpId, flowsPerDpn/batchSize);
+            LOG.info("Starting flow writer task for dpid: {}. Number of transactions: {}", dpId,
+                    flowsPerDpn / batchSize);
             writeOpStatus.set(FlowCounter.OperationStatus.IN_PROGRESS.status());
             short tableId = startTableId;
-            int numSubmits = flowsPerDpn/batchSize;
+            int numSubmits = flowsPerDpn / batchSize;
             int sourceIp = 1;
             int newBatchSize = batchSize;
 
             for (int i = 1; i <= numSubmits; i++) {
                 WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
-                short k = tableId;
+                short calculatedTableId = tableId;
                 for (; sourceIp <= newBatchSize; sourceIp++) {
-                    String flowId = "Flow-" + dpId + "." + k + "." + sourceIp;
+                    String flowId = "Flow-" + dpId + "." + calculatedTableId + "." + sourceIp;
                     Flow flow = null;
                     if (add) {
                         Match match = BulkOMaticUtils.getMatch(sourceIp);
-                        flow = BulkOMaticUtils.buildFlow(k, flowId, match);
+                        flow = BulkOMaticUtils.buildFlow(calculatedTableId, flowId, match);
                     }
 
                     addFlowToTx(writeTransaction, flowId,
-                            BulkOMaticUtils.getFlowInstanceIdentifier(k, flowId, dpId), flow, sourceIp, k);
+                            BulkOMaticUtils.getFlowInstanceIdentifier(calculatedTableId, flowId, dpId), flow, sourceIp,
+                            calculatedTableId);
 
                     if (sourceIp < newBatchSize) {
-                        short a = 1;
-                        short b = (short)(endTableId - startTableId + 1);
-                        k = (short) (((k + a) % b) + startTableId);
+                        short numberA = 1;
+                        short numberB = (short) (endTableId - startTableId + 1);
+                        calculatedTableId = (short) ((calculatedTableId + numberA) % numberB + startTableId);
                     }
                 }
-                Futures.addCallback(writeTransaction.submit(), new DsCallBack(dpId, tableId, k, sourceIp));
+                writeTransaction.commit().addCallback(
+                        new DsCallBack(dpId, tableId, calculatedTableId, sourceIp), MoreExecutors.directExecutor());
                 // Wrap around
-                tableId = (short)(((k + 1)%((short)(endTableId - startTableId + 1))) + startTableId);
+                tableId = (short) ((calculatedTableId + 1) % (short) (endTableId - startTableId + 1) + startTableId);
                 newBatchSize += batchSize;
-                if (((i%sleepAfter) == 0) && (sleepMillis > 0)) {
+                if (i % sleepAfter == 0 && sleepMillis > 0) {
                     try {
                         Thread.sleep(sleepMillis);
                     } catch (InterruptedException e) {
@@ -159,23 +146,24 @@ public class FlowWriterConcurrent implements FlowCounterMBean {
             }
         }
 
-        private void addFlowToTx(WriteTransaction writeTransaction, String flowId, InstanceIdentifier<Flow> flowIid, Flow flow, Integer sourceIp, Short tableId){
+        private void addFlowToTx(WriteTransaction writeTransaction, String flowId, InstanceIdentifier<Flow> flowIid,
+                Flow flow, Integer sourceIp, Short tableId) {
             if (add) {
                 LOG.trace("Adding flow for flowId: {}, flowIid: {}", flowId, flowIid);
-                writeTransaction.put(LogicalDatastoreType.CONFIGURATION, flowIid, flow, true);
+                writeTransaction.put(LogicalDatastoreType.CONFIGURATION, flowIid, flow, isCreateParents);
             } else {
                 LOG.trace("Deleting flow for flowId: {}, flowIid: {}", flowId, flowIid);
                 writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, flowIid);
             }
         }
 
-        private class DsCallBack implements FutureCallback {
-            private String dpId;
-            private int sourceIp;
-            private short endTableId;
-            private short beginTableId;
+        private class DsCallBack implements FutureCallback<Object> {
+            private final String dpId;
+            private final int sourceIp;
+            private final short endTableId;
+            private final short beginTableId;
 
-            public DsCallBack(String dpId, Short beginTableId, Short endTableId, Integer sourceIp) {
+            DsCallBack(String dpId, Short beginTableId, Short endTableId, Integer sourceIp) {
                 this.dpId = dpId;
                 this.sourceIp = sourceIp;
                 this.endTableId = endTableId;
@@ -183,29 +171,28 @@ public class FlowWriterConcurrent implements FlowCounterMBean {
             }
 
             @Override
-            public void onSuccess(Object o) {
+            public void onSuccess(Object object) {
                 if (remainingTxReturn.decrementAndGet() <= 0) {
                     long dur = System.nanoTime() - startTime;
-                    LOG.info("Completed all flows installation for: dpid: {} in {}ns", dpId,
-                            dur);
-                    if(0 == countDpnWriteCompletion.decrementAndGet() &&
-                            writeOpStatus.get() != FlowCounter.OperationStatus.FAILURE.status()) {
+                    LOG.info("Completed all flows installation for: dpid: {} in {}ns", dpId, dur);
+                    if (0 == countDpnWriteCompletion.decrementAndGet()
+                            && writeOpStatus.get() != FlowCounter.OperationStatus.FAILURE.status()) {
                         writeOpStatus.set(FlowCounter.OperationStatus.SUCCESS.status());
                         taskCompletionTime.set(dur);
                     }
                 }
             }
 
+            @Override
             public void onFailure(Throwable error) {
                 if (remainingTxReturn.decrementAndGet() <= 0) {
                     long dur = System.nanoTime() - startTime;
-                    LOG.info("Completed all flows installation for: dpid: {} in {}ns", dpId,
-                            dur);
+                    LOG.info("Completed all flows installation for: dpid: {} in {}ns", dpId, dur);
                 }
-                LOG.error("Error: {} in Datastore write operation: dpid: {}, begin tableId: {}, " +
-                        "end tableId: {}, sourceIp: {} ", error, dpId, beginTableId, endTableId, sourceIp);
+                LOG.error("Error: {} in Datastore write operation: dpid: {}, begin tableId: {}, "
+                        "end tableId: {}, sourceIp: {} ", error, dpId, beginTableId, endTableId, sourceIp);
                 writeOpStatus.set(FlowCounter.OperationStatus.FAILURE.status());
             }
         }
     }
-}
\ No newline at end of file
+}