Switch to MD-SAL APIs
[openflowplugin.git] / applications / bulk-o-matic / src / main / java / org / opendaylight / openflowplugin / applications / bulk / o / matic / FlowWriterConcurrent.java
index 65c014db380be0dd9ed17ddcde271a7a512186ae..ae73a6248c50511a17d2db11cc2c08246b261557 100644 (file)
@@ -8,13 +8,13 @@
 package org.opendaylight.openflowplugin.applications.bulk.o.matic;
 
 import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.MoreExecutors;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.binding.api.WriteTransaction;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
@@ -23,13 +23,14 @@ import org.slf4j.LoggerFactory;
 
 public class FlowWriterConcurrent implements FlowCounterMBean {
     private static final Logger LOG = LoggerFactory.getLogger(FlowWriterConcurrent.class);
-    public static final String USING_CONCURRENT_IMPLEMENTATION_OF_FLOW_WRITER = "Using Concurrent implementation of Flow Writer.";
+    public static final String USING_CONCURRENT_IMPLEMENTATION_OF_FLOW_WRITER =
+            "Using Concurrent implementation of Flow Writer.";
     private final DataBroker dataBroker;
     private final ExecutorService flowPusher;
     private long startTime;
-    private AtomicInteger writeOpStatus = new AtomicInteger(FlowCounter.OperationStatus.INIT.status());
-    private AtomicInteger countDpnWriteCompletion = new AtomicInteger();
-    private AtomicLong taskCompletionTime = new AtomicLong();
+    private final AtomicInteger writeOpStatus = new AtomicInteger(FlowCounter.OperationStatus.INIT.status());
+    private final AtomicInteger countDpnWriteCompletion = new AtomicInteger();
+    private final AtomicLong taskCompletionTime = new AtomicLong();
 
     public FlowWriterConcurrent(final DataBroker dataBroker, ExecutorService flowPusher) {
         this.dataBroker = dataBroker;
@@ -37,26 +38,25 @@ public class FlowWriterConcurrent implements FlowCounterMBean {
         LOG.info(USING_CONCURRENT_IMPLEMENTATION_OF_FLOW_WRITER);
     }
 
-    public void addFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize,
-                         int sleepMillis, int sleepAfter, short startTableId, short endTableId,
-                         boolean isCreateParents) {
+    public void addFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize, int sleepMillis, int sleepAfter,
+            short startTableId, short endTableId, boolean isCreateParents) {
         LOG.info(USING_CONCURRENT_IMPLEMENTATION_OF_FLOW_WRITER);
         countDpnWriteCompletion.set(dpnCount);
         startTime = System.nanoTime();
         for (int i = 1; i <= dpnCount; i++) {
-            FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i),
-                    flowsPerDPN, true, batchSize, sleepMillis, sleepAfter, startTableId, endTableId, isCreateParents);
+            FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, true, batchSize, sleepMillis,
+                    sleepAfter, startTableId, endTableId, isCreateParents);
             flowPusher.execute(task);
         }
     }
 
-    public void deleteFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize,
-                            short startTableId, short endTableId) {
+    public void deleteFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize, short startTableId,
+            short endTableId) {
         LOG.info(USING_CONCURRENT_IMPLEMENTATION_OF_FLOW_WRITER);
         countDpnWriteCompletion.set(dpnCount);
         for (int i = 1; i <= dpnCount; i++) {
-            FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, false, batchSize,
-                    0, 1, startTableId, endTableId, false);
+            FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, false, batchSize, 0, 1,
+                    startTableId, endTableId, false);
             flowPusher.execute(task);
         }
     }
@@ -80,18 +80,14 @@ public class FlowWriterConcurrent implements FlowCounterMBean {
         private final int sleepMillis;
         private final short startTableId;
         private final short endTableId;
-        private AtomicInteger remainingTxReturn = new AtomicInteger(0);
+        private final AtomicInteger remainingTxReturn = new AtomicInteger(0);
         private final boolean isCreateParents;
 
-        public FlowHandlerTask(final String dpId,
-                               final int flowsPerDpn,
-                               final boolean add,
-                               final int batchSize,
-                               final int sleepMillis,
-                               final int sleepAfter,
-                               final short startTableId,
-                               final short endTableId,
-                               final boolean isCreateParents){
+        FlowHandlerTask(final String dpId, final int flowsPerDpn,
+                final boolean add, final int batchSize,
+                final int sleepMillis, final int sleepAfter,
+                final short startTableId, final short endTableId,
+                final boolean isCreateParents) {
             this.dpId = BulkOMaticUtils.DEVICE_TYPE_PREFIX + dpId;
             this.add = add;
             this.flowsPerDpn = flowsPerDpn;
@@ -101,43 +97,46 @@ public class FlowWriterConcurrent implements FlowCounterMBean {
             this.startTableId = startTableId;
             this.endTableId = endTableId;
             this.isCreateParents = isCreateParents;
-            remainingTxReturn.set(flowsPerDpn/batchSize);
+            remainingTxReturn.set(flowsPerDpn / batchSize);
         }
 
         @Override
         public void run() {
-            LOG.info("Starting flow writer task for dpid: {}. Number of transactions: {}", dpId, flowsPerDpn/batchSize);
+            LOG.info("Starting flow writer task for dpid: {}. Number of transactions: {}", dpId,
+                    flowsPerDpn / batchSize);
             writeOpStatus.set(FlowCounter.OperationStatus.IN_PROGRESS.status());
             short tableId = startTableId;
-            int numSubmits = flowsPerDpn/batchSize;
+            int numSubmits = flowsPerDpn / batchSize;
             int sourceIp = 1;
             int newBatchSize = batchSize;
 
             for (int i = 1; i <= numSubmits; i++) {
                 WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
-                short k = tableId;
+                short calculatedTableId = tableId;
                 for (; sourceIp <= newBatchSize; sourceIp++) {
-                    String flowId = "Flow-" + dpId + "." + k + "." + sourceIp;
+                    String flowId = "Flow-" + dpId + "." + calculatedTableId + "." + sourceIp;
                     Flow flow = null;
                     if (add) {
                         Match match = BulkOMaticUtils.getMatch(sourceIp);
-                        flow = BulkOMaticUtils.buildFlow(k, flowId, match);
+                        flow = BulkOMaticUtils.buildFlow(calculatedTableId, flowId, match);
                     }
 
                     addFlowToTx(writeTransaction, flowId,
-                            BulkOMaticUtils.getFlowInstanceIdentifier(k, flowId, dpId), flow, sourceIp, k);
+                            BulkOMaticUtils.getFlowInstanceIdentifier(calculatedTableId, flowId, dpId), flow, sourceIp,
+                            calculatedTableId);
 
                     if (sourceIp < newBatchSize) {
-                        short a = 1;
-                        short b = (short)(endTableId - startTableId + 1);
-                        k = (short) (((k + a) % b) + startTableId);
+                        short numberA = 1;
+                        short numberB = (short) (endTableId - startTableId + 1);
+                        calculatedTableId = (short) ((calculatedTableId + numberA) % numberB + startTableId);
                     }
                 }
-                Futures.addCallback(writeTransaction.submit(), new DsCallBack(dpId, tableId, k, sourceIp));
+                writeTransaction.commit().addCallback(
+                        new DsCallBack(dpId, tableId, calculatedTableId, sourceIp), MoreExecutors.directExecutor());
                 // Wrap around
-                tableId = (short)(((k + 1)%((short)(endTableId - startTableId + 1))) + startTableId);
+                tableId = (short) ((calculatedTableId + 1) % (short) (endTableId - startTableId + 1) + startTableId);
                 newBatchSize += batchSize;
-                if (((i%sleepAfter) == 0) && (sleepMillis > 0)) {
+                if (i % sleepAfter == 0 && sleepMillis > 0) {
                     try {
                         Thread.sleep(sleepMillis);
                     } catch (InterruptedException e) {
@@ -147,7 +146,8 @@ public class FlowWriterConcurrent implements FlowCounterMBean {
             }
         }
 
-        private void addFlowToTx(WriteTransaction writeTransaction, String flowId, InstanceIdentifier<Flow> flowIid, Flow flow, Integer sourceIp, Short tableId){
+        private void addFlowToTx(WriteTransaction writeTransaction, String flowId, InstanceIdentifier<Flow> flowIid,
+                Flow flow, Integer sourceIp, Short tableId) {
             if (add) {
                 LOG.trace("Adding flow for flowId: {}, flowIid: {}", flowId, flowIid);
                 writeTransaction.put(LogicalDatastoreType.CONFIGURATION, flowIid, flow, isCreateParents);
@@ -157,13 +157,13 @@ public class FlowWriterConcurrent implements FlowCounterMBean {
             }
         }
 
-        private class DsCallBack implements FutureCallback {
-            private String dpId;
-            private int sourceIp;
-            private short endTableId;
-            private short beginTableId;
+        private class DsCallBack implements FutureCallback<Object> {
+            private final String dpId;
+            private final int sourceIp;
+            private final short endTableId;
+            private final short beginTableId;
 
-            public DsCallBack(String dpId, Short beginTableId, Short endTableId, Integer sourceIp) {
+            DsCallBack(String dpId, Short beginTableId, Short endTableId, Integer sourceIp) {
                 this.dpId = dpId;
                 this.sourceIp = sourceIp;
                 this.endTableId = endTableId;
@@ -171,29 +171,28 @@ public class FlowWriterConcurrent implements FlowCounterMBean {
             }
 
             @Override
-            public void onSuccess(Object o) {
+            public void onSuccess(Object object) {
                 if (remainingTxReturn.decrementAndGet() <= 0) {
                     long dur = System.nanoTime() - startTime;
-                    LOG.info("Completed all flows installation for: dpid: {} in {}ns", dpId,
-                            dur);
-                    if(0 == countDpnWriteCompletion.decrementAndGet() &&
-                            writeOpStatus.get() != FlowCounter.OperationStatus.FAILURE.status()) {
+                    LOG.info("Completed all flows installation for: dpid: {} in {}ns", dpId, dur);
+                    if (0 == countDpnWriteCompletion.decrementAndGet()
+                            && writeOpStatus.get() != FlowCounter.OperationStatus.FAILURE.status()) {
                         writeOpStatus.set(FlowCounter.OperationStatus.SUCCESS.status());
                         taskCompletionTime.set(dur);
                     }
                 }
             }
 
+            @Override
             public void onFailure(Throwable error) {
                 if (remainingTxReturn.decrementAndGet() <= 0) {
                     long dur = System.nanoTime() - startTime;
-                    LOG.info("Completed all flows installation for: dpid: {} in {}ns", dpId,
-                            dur);
+                    LOG.info("Completed all flows installation for: dpid: {} in {}ns", dpId, dur);
                 }
-                LOG.error("Error: {} in Datastore write operation: dpid: {}, begin tableId: {}, " +
-                        "end tableId: {}, sourceIp: {} ", error, dpId, beginTableId, endTableId, sourceIp);
+                LOG.error("Error: {} in Datastore write operation: dpid: {}, begin tableId: {}, "
+                        "end tableId: {}, sourceIp: {} ", error, dpId, beginTableId, endTableId, sourceIp);
                 writeOpStatus.set(FlowCounter.OperationStatus.FAILURE.status());
             }
         }
     }
-}
\ No newline at end of file
+}