Merge "OPNFLWPLUG-963 : Updating ports delete reason from OFP:"
[openflowplugin.git] / applications / bulk-o-matic / src / main / java / org / opendaylight / openflowplugin / applications / bulk / o / matic / FlowWriterSequential.java
index c6c6b841a8762a785e87aed7708d0b30de32532c..7e8fe0009fab658b03f81e8f2ae3379d13100036 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2016 Ericsson Systems, Inc. and others.  All rights reserved.
+ * Copyright (c) 2016, 2017 Ericsson Systems, Inc. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
@@ -9,6 +9,8 @@ package org.opendaylight.openflowplugin.applications.bulk.o.matic;
 
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.MoreExecutors;
+
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -27,9 +29,9 @@ public class FlowWriterSequential implements FlowCounterMBean {
     private final ExecutorService flowPusher;
     protected int dpnCount;
     private long startTime;
-    private AtomicInteger writeOpStatus = new AtomicInteger(FlowCounter.OperationStatus.INIT.status());
-    private AtomicInteger countDpnWriteCompletion = new AtomicInteger();
-    private AtomicLong taskCompletionTime = new AtomicLong();
+    private final AtomicInteger writeOpStatus = new AtomicInteger(FlowCounter.OperationStatus.INIT.status());
+    private final AtomicInteger countDpnWriteCompletion = new AtomicInteger();
+    private final AtomicLong taskCompletionTime = new AtomicLong();
 
     public FlowWriterSequential(final DataBroker dataBroker, ExecutorService flowPusher) {
         this.dataBroker = dataBroker;
@@ -37,24 +39,24 @@ public class FlowWriterSequential implements FlowCounterMBean {
         LOG.info("Using Sequential implementation of Flow Writer.");
     }
 
-    public void addFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize, int sleepMillis,
-                         short startTableId, short endTableId, boolean isCreateParents) {
+    public void addFlows(Integer count, Integer flowsPerDPN, int batchSize, int sleepMillis, short startTableId,
+            short endTableId, boolean isCreateParents) {
         LOG.info("Using Sequential implementation of Flow Writer.");
-        this.dpnCount = dpnCount;
-        countDpnWriteCompletion.set(dpnCount);
+        this.dpnCount = count;
+        countDpnWriteCompletion.set(count);
         startTime = System.nanoTime();
-        for (int i = 1; i <= dpnCount; i++) {
-            FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, true, batchSize,
-                    sleepMillis, startTableId, endTableId, isCreateParents);
+        for (int i = 1; i <= count; i++) {
+            FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, true, batchSize, sleepMillis,
+                    startTableId, endTableId, isCreateParents);
             flowPusher.execute(task);
         }
     }
 
-    public void deleteFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize, short startTableId,
-                            short endTableId) {
+    public void deleteFlows(Integer count, Integer flowsPerDPN, int batchSize, short startTableId,
+            short endTableId) {
         LOG.info("Using Sequential implementation of Flow Writer.");
-        countDpnWriteCompletion.set(dpnCount);
-        for (int i = 1; i <= dpnCount; i++) {
+        countDpnWriteCompletion.set(count);
+        for (int i = 1; i <= count; i++) {
             FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, false, batchSize, 0,
                     startTableId, endTableId, false);
             flowPusher.execute(task);
@@ -81,14 +83,14 @@ public class FlowWriterSequential implements FlowCounterMBean {
         private final short endTableId;
         private final boolean isCreateParents;
 
-        public FlowHandlerTask(final String dpId,
-                               final int flowsPerDpn,
-                               final boolean add,
-                               final int batchSize,
-                               int sleepMillis,
-                               final short startTableId,
-                               final short endTableId,
-                               final boolean isCreateParents){
+        FlowHandlerTask(final String dpId,
+                        final int flowsPerDpn,
+                        final boolean add,
+                        final int batchSize,
+                        int sleepMillis,
+                        final short startTableId,
+                        final short endTableId,
+                        final boolean isCreateParents) {
             this.dpId = BulkOMaticUtils.DEVICE_TYPE_PREFIX + dpId;
             this.add = add;
             this.flowsPerDpn = flowsPerDpn;
@@ -101,40 +103,43 @@ public class FlowWriterSequential implements FlowCounterMBean {
 
         @Override
         public void run() {
-            LOG.info("Starting flow writer task for dpid: {}. Number of transactions: {}", dpId, flowsPerDpn/batchSize);
+            LOG.info("Starting flow writer task for dpid: {}. Number of transactions: {}", dpId,
+                    flowsPerDpn / batchSize);
             writeOpStatus.set(FlowCounter.OperationStatus.IN_PROGRESS.status());
 
             Short tableId = startTableId;
             Integer sourceIp = 1;
 
             WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
-            short k = tableId;
+            short calculatedTableId = tableId;
 
             for (; sourceIp <= batchSize; sourceIp++) {
-                String flowId = "Flow-" + dpId + "." + k + "." + sourceIp;
+                String flowId = "Flow-" + dpId + "." + calculatedTableId + "." + sourceIp;
                 LOG.debug("Adding flow with id: {}", flowId);
                 Flow flow = null;
                 if (add) {
                     Match match = BulkOMaticUtils.getMatch(sourceIp);
-                    flow = BulkOMaticUtils.buildFlow(k, flowId, match);
+                    flow = BulkOMaticUtils.buildFlow(calculatedTableId, flowId, match);
                 }
                 addFlowToTx(writeTransaction, flowId,
-                        BulkOMaticUtils.getFlowInstanceIdentifier(k, flowId, dpId), flow);
+                        BulkOMaticUtils.getFlowInstanceIdentifier(calculatedTableId, flowId, dpId), flow);
 
                 if (sourceIp < batchSize) {
-                    short a = 1;
-                    short b = (short)(endTableId - startTableId + 1);
-                    k = (short) (((k + a) % b) + startTableId);
+                    short numberA = 1;
+                    short numberB = (short) (endTableId - startTableId + 1);
+                    calculatedTableId = (short) ((calculatedTableId + numberA) % numberB + startTableId);
                 }
             }
 
-            LOG.debug("Submitting Txn for dpId: {}, begin tableId: {}, end tableId: {}, sourceIp: {}", dpId, tableId, k, sourceIp);
+            LOG.debug("Submitting Txn for dpId: {}, begin tableId: {}, end tableId: {}, sourceIp: {}", dpId, tableId,
+                    calculatedTableId, sourceIp);
 
-            Futures.addCallback(writeTransaction.submit(), new DsCallBack(dpId, sourceIp, k));
+            Futures.addCallback(writeTransaction.submit(), new DsCallBack(dpId, sourceIp, calculatedTableId),
+                    MoreExecutors.directExecutor());
         }
 
         private void addFlowToTx(WriteTransaction writeTransaction, String flowId, InstanceIdentifier<Flow> flowIid,
-                                 Flow flow) {
+                Flow flow) {
             if (add) {
                 LOG.trace("Adding flow for flowId: {}, flowIid: {}", flowId, flowIid);
                 writeTransaction.put(LogicalDatastoreType.CONFIGURATION, flowIid, flow, isCreateParents);
@@ -145,26 +150,26 @@ public class FlowWriterSequential implements FlowCounterMBean {
         }
 
         private class DsCallBack implements FutureCallback {
-            private String dpId;
+            private final String dpId;
             private Integer sourceIp;
-            private Short tableId;
+            private final Short tableId;
 
-            public DsCallBack(String dpId, Integer sourceIp, Short tableId) {
+            DsCallBack(String dpId, Integer sourceIp, Short tableId) {
                 this.dpId = dpId;
                 this.sourceIp = sourceIp;
-                short a = 1;
-                short b = (short)(endTableId - startTableId + 1);
-                this.tableId = (short) (((tableId + a) % b) + startTableId);
+                short numberA = 1;
+                short numberB = (short) (endTableId - startTableId + 1);
+                this.tableId = (short) ((tableId + numberA) % numberB + startTableId);
             }
 
             @Override
-            public void onSuccess(Object o) {
+            public void onSuccess(Object object) {
                 if (sourceIp > flowsPerDpn) {
                     long dur = System.nanoTime() - startTime;
                     LOG.info("Completed all flows installation for: dpid: {}, tableId: {}, sourceIp: {} in {}ns", dpId,
                             tableId, sourceIp, dur);
-                    if(0 == countDpnWriteCompletion.decrementAndGet() &&
-                            writeOpStatus.get() != FlowCounter.OperationStatus.FAILURE.status()) {
+                    if (0 == countDpnWriteCompletion.decrementAndGet()
+                            && writeOpStatus.get() != FlowCounter.OperationStatus.FAILURE.status()) {
                         writeOpStatus.set(FlowCounter.OperationStatus.SUCCESS.status());
                         taskCompletionTime.set(dur);
                     }
@@ -180,34 +185,37 @@ public class FlowWriterSequential implements FlowCounterMBean {
 
                 WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
                 int newBatchSize = sourceIp + batchSize - 1;
-                short k = tableId;
+                short calculatedTableId = tableId;
                 for (; sourceIp <= newBatchSize; sourceIp++) {
-                    String flowId = "Flow-" + dpId + "." + k + "." + sourceIp;
+                    String flowId = "Flow-" + dpId + "." + calculatedTableId + "." + sourceIp;
                     Flow flow = null;
                     if (add) {
                         Match match = BulkOMaticUtils.getMatch(sourceIp);
-                        flow = BulkOMaticUtils.buildFlow(k, flowId, match);
+                        flow = BulkOMaticUtils.buildFlow(calculatedTableId, flowId, match);
                     }
                     LOG.debug("Adding flow with id: {}", flowId);
                     addFlowToTx(writeTransaction, flowId,
-                            BulkOMaticUtils.getFlowInstanceIdentifier(k, flowId, dpId), flow);
+                            BulkOMaticUtils.getFlowInstanceIdentifier(calculatedTableId, flowId, dpId),
+                            flow);
 
                     if (sourceIp < newBatchSize) {
-                        short a = 1;
-                        short b = (short)(endTableId - startTableId + 1);
-                        k = (short) (((k + a) % b) + startTableId);
+                        short numberA = 1;
+                        short numberB = (short) (endTableId - startTableId + 1);
+                        calculatedTableId = (short) ((calculatedTableId + numberA) % numberB + startTableId);
                     }
                 }
                 LOG.debug("OnSuccess: Submitting Txn for dpId: {}, begin tableId: {}, end tableId: {}, sourceIp: {}",
-                        dpId, tableId, k, sourceIp);
-                Futures.addCallback(writeTransaction.submit(), new DsCallBack(dpId, sourceIp, k));
+                        dpId, tableId, calculatedTableId, sourceIp);
+                Futures.addCallback(writeTransaction.submit(), new DsCallBack(dpId, sourceIp, calculatedTableId),
+                        MoreExecutors.directExecutor());
             }
 
+            @Override
             public void onFailure(Throwable error) {
-                LOG.error("Error: {} in Datastore write operation: dpid: {}, tableId: {}, sourceIp: {}",
-                        error, dpId, tableId, sourceIp);
+                LOG.error("Error: {} in Datastore write operation: dpid: {}, tableId: {}, sourceIp: {}", error, dpId,
+                        tableId, sourceIp);
                 writeOpStatus.set(FlowCounter.OperationStatus.FAILURE.status());
             }
         }
     }
-}
\ No newline at end of file
+}