Merge "Bug 8408 - Deserialization exception in logs when NAT flows are added."
[openflowplugin.git] / applications / bulk-o-matic / src / main / java / org / opendaylight / openflowplugin / applications / bulk / o / matic / FlowWriterTxChain.java
index 3bd424911e165c56d14b66eb04e81833bec9b06b..660b8d0797c9a7a1f2b27bb22d02c10557d9f225 100644 (file)
@@ -9,6 +9,9 @@ package org.opendaylight.openflowplugin.applications.bulk.o.matic;
 
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
@@ -22,19 +25,14 @@ import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
 public class FlowWriterTxChain implements FlowCounterMBean {
     private static final Logger LOG = LoggerFactory.getLogger(FlowWriterTxChain.class);
     private final DataBroker dataBroker;
     private final ExecutorService flowPusher;
     private long startTime;
     private AtomicInteger writeOpStatus = new AtomicInteger(FlowCounter.OperationStatus.INIT.status());
-    private AtomicInteger countDpnWriteCompletion = new AtomicInteger(0);
-    private AtomicLong taskCompletionTime = new AtomicLong(0);
-    private final String UNITS = "ns";
+    private AtomicInteger countDpnWriteCompletion = new AtomicInteger();
+    private AtomicLong taskCompletionTime = new AtomicLong();
 
     public FlowWriterTxChain(final DataBroker dataBroker, ExecutorService flowPusher){
         this.dataBroker = dataBroker;
@@ -43,13 +41,14 @@ public class FlowWriterTxChain implements FlowCounterMBean {
     }
 
     public void addFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize,
-                         int sleepMillis, int sleepAfter, short startTableId, short endTableId) {
+                         int sleepMillis, int sleepAfter, short startTableId, short endTableId,
+                         boolean isCreateParents) {
         LOG.info("Using Transaction Chain Flow Writer Impl");
         countDpnWriteCompletion.set(dpnCount);
         startTime = System.nanoTime();
         for (int i = 1; i <= dpnCount; i++) {
             FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i),
-                    flowsPerDPN, true, batchSize, sleepMillis, sleepAfter, startTableId, endTableId);
+                    flowsPerDPN, true, batchSize, sleepMillis, sleepAfter, startTableId, endTableId, isCreateParents);
             flowPusher.execute(task);
         }
     }
@@ -60,21 +59,11 @@ public class FlowWriterTxChain implements FlowCounterMBean {
         countDpnWriteCompletion.set(dpnCount);
         for (int i = 1; i <= dpnCount; i++) {
             FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, false, batchSize,
-                    0, 1, startTableId, endTableId);
+                    0, 1, startTableId, endTableId, false);
             flowPusher.execute(task);
         }
     }
 
-    @Override
-    public long getFlowCount() {
-        return BulkOMaticUtils.DEFAULT_FLOW_COUNT;
-    }
-
-    @Override
-    public int getReadOpStatus() {
-        return BulkOMaticUtils.DEFUALT_STATUS;
-    }
-
     @Override
     public int getWriteOpStatus() {
         return writeOpStatus.get();
@@ -85,11 +74,6 @@ public class FlowWriterTxChain implements FlowCounterMBean {
         return taskCompletionTime.get();
     }
 
-    @Override
-    public String getUnits() {
-        return UNITS;
-    }
-
     private class FlowHandlerTask implements Runnable, TransactionChainListener {
         private final String dpId;
         private final boolean add;
@@ -99,6 +83,7 @@ public class FlowWriterTxChain implements FlowCounterMBean {
         private final int sleepMillis;
         private final short startTableId;
         private final short endTableId;
+        private final boolean isCreateParents;
         private AtomicInteger remainingTxReturn = new AtomicInteger(0);
 
         BindingTransactionChain txChain;
@@ -110,7 +95,8 @@ public class FlowWriterTxChain implements FlowCounterMBean {
                                final int sleepMillis,
                                final int sleepAfter,
                                final short startTableId,
-                               final short endTableId){
+                               final short endTableId,
+                               final boolean isCreateParents){
             this.dpId = BulkOMaticUtils.DEVICE_TYPE_PREFIX + dpId;
             this.add = add;
             this.flowsPerDpn = flowsPerDpn;
@@ -119,6 +105,7 @@ public class FlowWriterTxChain implements FlowCounterMBean {
             this.sleepAfter = sleepAfter;
             this.startTableId = startTableId;
             this.endTableId = endTableId;
+            this.isCreateParents = isCreateParents;
             remainingTxReturn.set(flowsPerDpn/batchSize);
         }
 
@@ -191,7 +178,7 @@ public class FlowWriterTxChain implements FlowCounterMBean {
         private void writeTxToDs(WriteTransaction writeTransaction, String flowId, InstanceIdentifier<Flow> flowIid, Flow flow, Integer sourceIp, Short tableId){
             if (add) {
                 LOG.trace("Adding flow for flowId: {}, flowIid: {}", flowId, flowIid);
-                writeTransaction.put(LogicalDatastoreType.CONFIGURATION, flowIid, flow, true);
+                writeTransaction.put(LogicalDatastoreType.CONFIGURATION, flowIid, flow, isCreateParents);
             } else {
                 LOG.trace("Deleting flow for flowId: {}, flowIid: {}", flowId, flowIid);
                 writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, flowIid);