Bug 8293: Add table writer to bulk-o-matic 58/54758/8
authorFaiz Ilahi Kothari <faiz.ilahi.k.kothari@ericsson.com>
Thu, 9 Feb 2017 05:57:43 +0000 (11:27 +0530)
committerFaiz Ilahi Kothari <faiz.ilahi.k.kothari@ericsson.com>
Thu, 4 May 2017 09:39:52 +0000 (15:09 +0530)
* Table writer is introduced for Pre-Leader Role test in which
  table is added first and flows next. We want the ability to
  create tables separately and not to be created automatically
  on addition of flows.
* Modify flow-test rpc to accomodate a boolean: create-parents.
  This allows us to prevent addition of flows in case the tables
  are missing.
* Fix import ordering

Change-Id: Ic3fe4764631d9cbc7550ad3d7d926debb3ca6977
Signed-off-by: Faiz Ilahi Kothari <faiz.ilahi.k.kothari@ericsson.com>
16 files changed:
applications/bulk-o-matic/src/main/java/org/opendaylight/openflowplugin/applications/bulk/o/matic/BulkOMaticUtils.java
applications/bulk-o-matic/src/main/java/org/opendaylight/openflowplugin/applications/bulk/o/matic/FlowCounter.java
applications/bulk-o-matic/src/main/java/org/opendaylight/openflowplugin/applications/bulk/o/matic/FlowCounterMBean.java
applications/bulk-o-matic/src/main/java/org/opendaylight/openflowplugin/applications/bulk/o/matic/FlowReader.java
applications/bulk-o-matic/src/main/java/org/opendaylight/openflowplugin/applications/bulk/o/matic/FlowWriterConcurrent.java
applications/bulk-o-matic/src/main/java/org/opendaylight/openflowplugin/applications/bulk/o/matic/FlowWriterDirectOFRpc.java
applications/bulk-o-matic/src/main/java/org/opendaylight/openflowplugin/applications/bulk/o/matic/FlowWriterSequential.java
applications/bulk-o-matic/src/main/java/org/opendaylight/openflowplugin/applications/bulk/o/matic/FlowWriterTxChain.java
applications/bulk-o-matic/src/main/java/org/opendaylight/openflowplugin/applications/bulk/o/matic/SalBulkFlowServiceImpl.java
applications/bulk-o-matic/src/main/java/org/opendaylight/openflowplugin/applications/bulk/o/matic/TableWriter.java [new file with mode: 0644]
applications/bulk-o-matic/src/main/yang/sal-bulk-flow.yang
applications/bulk-o-matic/src/test/java/org/opendaylight/openflowplugin/applications/bulk/o/matic/FlowWriterConcurrentTest.java
applications/bulk-o-matic/src/test/java/org/opendaylight/openflowplugin/applications/bulk/o/matic/FlowWriterSequentialTest.java
applications/bulk-o-matic/src/test/java/org/opendaylight/openflowplugin/applications/bulk/o/matic/FlowWriterTxChainTest.java
applications/bulk-o-matic/src/test/java/org/opendaylight/openflowplugin/applications/bulk/o/matic/SalBulkFlowServiceImplTest.java
applications/bulk-o-matic/src/test/java/org/opendaylight/openflowplugin/applications/bulk/o/matic/TableWriterTest.java [new file with mode: 0644]

index 8c80b15b03653c6d7c03a5e22f1d4d0e0ccaab3b..8a750d56e397007ad085a199084f7d2f28d29302 100644 (file)
@@ -32,6 +32,7 @@ public final class BulkOMaticUtils {
 
     public static final int DEFUALT_STATUS = FlowCounter.OperationStatus.INIT.status();
     public static final int DEFAULT_FLOW_COUNT = 0;
+    public static final int DEFAULT_TABLE_COUNT = 0;
     public static final long DEFAULT_COMPLETION_TIME = 0;
     public static final String DEFAULT_UNITS = "ns";
     public static final String DEVICE_TYPE_PREFIX = "openflow:";
index f0c42537b688642fdf560df52118edcb79363784..32716eaaada2ede3559c3c51450055852f8ac796 100644 (file)
@@ -78,4 +78,12 @@ public class FlowCounter implements FlowCounterMBean {
             return BulkOMaticUtils.DEFAULT_UNITS;
         }
     }
+
+    @Override
+    public long getTableCount() {
+        if (writer != null) {
+            return writer.getTableCount();
+        }
+        return BulkOMaticUtils.DEFAULT_TABLE_COUNT;
+    }
 }
index 7d5de72fcf23787ee1abe9ef0b5113453614e23f..b1ddb4a6fceb46bcd6109f6bf4643e2421b4d935 100644 (file)
@@ -9,14 +9,28 @@ package org.opendaylight.openflowplugin.applications.bulk.o.matic;
 
 public interface FlowCounterMBean {
 
-    public long getFlowCount();
+    default public long getFlowCount() {
+        return BulkOMaticUtils.DEFAULT_FLOW_COUNT;
+    }
 
-    public int getReadOpStatus();
+    default public int getReadOpStatus() {
+        return BulkOMaticUtils.DEFUALT_STATUS;
+    }
 
-    public int getWriteOpStatus();
+    default public int getWriteOpStatus() {
+        return BulkOMaticUtils.DEFUALT_STATUS;
+    }
 
-    public long getTaskCompletionTime();
+    default public long getTaskCompletionTime() {
+        return BulkOMaticUtils.DEFAULT_COMPLETION_TIME;
+    }
 
-    public String getUnits();
+    default public String getUnits() {
+        return BulkOMaticUtils.DEFAULT_UNITS;
+    }
+
+    default public long getTableCount() {
+        return BulkOMaticUtils.DEFAULT_TABLE_COUNT;
+    }
 }
 
index 3796081892a79f570c5f702ca56a7aae85f29986..70c3bbbeb58b877832daa0162ebcf5d9be4e344d 100644 (file)
@@ -8,6 +8,8 @@
 package org.opendaylight.openflowplugin.applications.bulk.o.matic;
 
 import com.google.common.base.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
@@ -26,9 +28,6 @@ import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
 public class FlowReader implements Runnable, FlowCounterMBean {
     private static final Logger LOG = LoggerFactory.getLogger(FlowReader.class);
     private final DataBroker dataBroker;
@@ -38,9 +37,8 @@ public class FlowReader implements Runnable, FlowCounterMBean {
     private final short startTableId;
     private final short endTableId;
     private final boolean isConfigDs;
-    private AtomicLong flowCount = new AtomicLong(0);
+    private AtomicLong flowCount = new AtomicLong();
     private AtomicInteger readOpStatus = new AtomicInteger(FlowCounter.OperationStatus.INIT.status());
-    private static final String UNITS = "ns";
 
     private FlowReader(final DataBroker dataBroker,
                       final Integer dpnCount,
@@ -135,19 +133,4 @@ public class FlowReader implements Runnable, FlowCounterMBean {
     public int getReadOpStatus() {
         return readOpStatus.get();
     }
-
-    @Override
-    public int getWriteOpStatus() {
-        return BulkOMaticUtils.DEFUALT_STATUS;
-    }
-
-    @Override
-    public long getTaskCompletionTime() {
-        return BulkOMaticUtils.DEFAULT_COMPLETION_TIME;
-    }
-
-    @Override
-    public String getUnits() {
-        return UNITS;
-    }
 }
\ No newline at end of file
index 96904bd8d6a460d6dadaefa1d587b49407dd89f2..65c014db380be0dd9ed17ddcde271a7a512186ae 100644 (file)
@@ -9,6 +9,9 @@ package org.opendaylight.openflowplugin.applications.bulk.o.matic;
 
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
@@ -18,10 +21,6 @@ import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
 public class FlowWriterConcurrent implements FlowCounterMBean {
     private static final Logger LOG = LoggerFactory.getLogger(FlowWriterConcurrent.class);
     public static final String USING_CONCURRENT_IMPLEMENTATION_OF_FLOW_WRITER = "Using Concurrent implementation of Flow Writer.";
@@ -29,9 +28,8 @@ public class FlowWriterConcurrent implements FlowCounterMBean {
     private final ExecutorService flowPusher;
     private long startTime;
     private AtomicInteger writeOpStatus = new AtomicInteger(FlowCounter.OperationStatus.INIT.status());
-    private AtomicInteger countDpnWriteCompletion = new AtomicInteger(0);
-    private AtomicLong taskCompletionTime = new AtomicLong(0);
-    private static final String UNITS = "ns";
+    private AtomicInteger countDpnWriteCompletion = new AtomicInteger();
+    private AtomicLong taskCompletionTime = new AtomicLong();
 
     public FlowWriterConcurrent(final DataBroker dataBroker, ExecutorService flowPusher) {
         this.dataBroker = dataBroker;
@@ -40,13 +38,14 @@ public class FlowWriterConcurrent implements FlowCounterMBean {
     }
 
     public void addFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize,
-                         int sleepMillis, int sleepAfter, short startTableId, short endTableId) {
+                         int sleepMillis, int sleepAfter, short startTableId, short endTableId,
+                         boolean isCreateParents) {
         LOG.info(USING_CONCURRENT_IMPLEMENTATION_OF_FLOW_WRITER);
         countDpnWriteCompletion.set(dpnCount);
         startTime = System.nanoTime();
         for (int i = 1; i <= dpnCount; i++) {
             FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i),
-                    flowsPerDPN, true, batchSize, sleepMillis, sleepAfter, startTableId, endTableId);
+                    flowsPerDPN, true, batchSize, sleepMillis, sleepAfter, startTableId, endTableId, isCreateParents);
             flowPusher.execute(task);
         }
     }
@@ -57,21 +56,11 @@ public class FlowWriterConcurrent implements FlowCounterMBean {
         countDpnWriteCompletion.set(dpnCount);
         for (int i = 1; i <= dpnCount; i++) {
             FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, false, batchSize,
-                    0, 1, startTableId, endTableId);
+                    0, 1, startTableId, endTableId, false);
             flowPusher.execute(task);
         }
     }
 
-    @Override
-    public long getFlowCount() {
-        return BulkOMaticUtils.DEFAULT_FLOW_COUNT;
-    }
-
-    @Override
-    public int getReadOpStatus() {
-        return BulkOMaticUtils.DEFUALT_STATUS;
-    }
-
     @Override
     public int getWriteOpStatus() {
         return writeOpStatus.get();
@@ -82,11 +71,6 @@ public class FlowWriterConcurrent implements FlowCounterMBean {
         return taskCompletionTime.get();
     }
 
-    @Override
-    public String getUnits() {
-        return UNITS;
-    }
-
     private class FlowHandlerTask implements Runnable {
         private final String dpId;
         private final boolean add;
@@ -97,6 +81,7 @@ public class FlowWriterConcurrent implements FlowCounterMBean {
         private final short startTableId;
         private final short endTableId;
         private AtomicInteger remainingTxReturn = new AtomicInteger(0);
+        private final boolean isCreateParents;
 
         public FlowHandlerTask(final String dpId,
                                final int flowsPerDpn,
@@ -105,7 +90,8 @@ public class FlowWriterConcurrent implements FlowCounterMBean {
                                final int sleepMillis,
                                final int sleepAfter,
                                final short startTableId,
-                               final short endTableId){
+                               final short endTableId,
+                               final boolean isCreateParents){
             this.dpId = BulkOMaticUtils.DEVICE_TYPE_PREFIX + dpId;
             this.add = add;
             this.flowsPerDpn = flowsPerDpn;
@@ -114,6 +100,7 @@ public class FlowWriterConcurrent implements FlowCounterMBean {
             this.sleepAfter = sleepAfter;
             this.startTableId = startTableId;
             this.endTableId = endTableId;
+            this.isCreateParents = isCreateParents;
             remainingTxReturn.set(flowsPerDpn/batchSize);
         }
 
@@ -163,7 +150,7 @@ public class FlowWriterConcurrent implements FlowCounterMBean {
         private void addFlowToTx(WriteTransaction writeTransaction, String flowId, InstanceIdentifier<Flow> flowIid, Flow flow, Integer sourceIp, Short tableId){
             if (add) {
                 LOG.trace("Adding flow for flowId: {}, flowIid: {}", flowId, flowIid);
-                writeTransaction.put(LogicalDatastoreType.CONFIGURATION, flowIid, flow, true);
+                writeTransaction.put(LogicalDatastoreType.CONFIGURATION, flowIid, flow, isCreateParents);
             } else {
                 LOG.trace("Deleting flow for flowId: {}, flowIid: {}", flowId, flowIid);
                 writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, flowIid);
index bfb764c07ac7b39d735699bf65ab4cf6ac9989f7..59d8409e56de777777b31604f05a7997502d3241 100644 (file)
@@ -8,6 +8,12 @@
 package org.opendaylight.openflowplugin.applications.bulk.o.matic;
 
 import com.google.common.base.Optional;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
@@ -27,13 +33,6 @@ import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-
 public class FlowWriterDirectOFRpc {
 
     private static final Logger LOG = LoggerFactory.getLogger(FlowWriterDirectOFRpc.class);
index 99436bec24e60dc96f78717cd5df5af4bbe821fc..c6c6b841a8762a785e87aed7708d0b30de32532c 100644 (file)
@@ -9,6 +9,9 @@ package org.opendaylight.openflowplugin.applications.bulk.o.matic;
 
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
@@ -18,10 +21,6 @@ import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
 public class FlowWriterSequential implements FlowCounterMBean {
     private static final Logger LOG = LoggerFactory.getLogger(FlowWriterSequential.class);
     private final DataBroker dataBroker;
@@ -29,9 +28,8 @@ public class FlowWriterSequential implements FlowCounterMBean {
     protected int dpnCount;
     private long startTime;
     private AtomicInteger writeOpStatus = new AtomicInteger(FlowCounter.OperationStatus.INIT.status());
-    private AtomicInteger countDpnWriteCompletion = new AtomicInteger(0);
-    private AtomicLong taskCompletionTime = new AtomicLong(0);
-    private static final String UNITS = "ns";
+    private AtomicInteger countDpnWriteCompletion = new AtomicInteger();
+    private AtomicLong taskCompletionTime = new AtomicLong();
 
     public FlowWriterSequential(final DataBroker dataBroker, ExecutorService flowPusher) {
         this.dataBroker = dataBroker;
@@ -40,14 +38,14 @@ public class FlowWriterSequential implements FlowCounterMBean {
     }
 
     public void addFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize, int sleepMillis,
-                         short startTableId, short endTableId) {
+                         short startTableId, short endTableId, boolean isCreateParents) {
         LOG.info("Using Sequential implementation of Flow Writer.");
         this.dpnCount = dpnCount;
         countDpnWriteCompletion.set(dpnCount);
         startTime = System.nanoTime();
         for (int i = 1; i <= dpnCount; i++) {
             FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, true, batchSize,
-                    sleepMillis, startTableId, endTableId);
+                    sleepMillis, startTableId, endTableId, isCreateParents);
             flowPusher.execute(task);
         }
     }
@@ -58,21 +56,11 @@ public class FlowWriterSequential implements FlowCounterMBean {
         countDpnWriteCompletion.set(dpnCount);
         for (int i = 1; i <= dpnCount; i++) {
             FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, false, batchSize, 0,
-                    startTableId, endTableId);
+                    startTableId, endTableId, false);
             flowPusher.execute(task);
         }
     }
 
-    @Override
-    public long getFlowCount() {
-        return BulkOMaticUtils.DEFAULT_FLOW_COUNT;
-    }
-
-    @Override
-    public int getReadOpStatus() {
-        return BulkOMaticUtils.DEFUALT_STATUS;
-    }
-
     @Override
     public int getWriteOpStatus() {
         return writeOpStatus.get();
@@ -83,11 +71,6 @@ public class FlowWriterSequential implements FlowCounterMBean {
         return taskCompletionTime.get();
     }
 
-    @Override
-    public String getUnits() {
-        return UNITS;
-    }
-
     private class FlowHandlerTask implements Runnable {
         private final String dpId;
         private final int flowsPerDpn;
@@ -96,6 +79,7 @@ public class FlowWriterSequential implements FlowCounterMBean {
         private final int sleepMillis;
         private final short startTableId;
         private final short endTableId;
+        private final boolean isCreateParents;
 
         public FlowHandlerTask(final String dpId,
                                final int flowsPerDpn,
@@ -103,7 +87,8 @@ public class FlowWriterSequential implements FlowCounterMBean {
                                final int batchSize,
                                int sleepMillis,
                                final short startTableId,
-                               final short endTableId){
+                               final short endTableId,
+                               final boolean isCreateParents){
             this.dpId = BulkOMaticUtils.DEVICE_TYPE_PREFIX + dpId;
             this.add = add;
             this.flowsPerDpn = flowsPerDpn;
@@ -111,6 +96,7 @@ public class FlowWriterSequential implements FlowCounterMBean {
             this.sleepMillis = sleepMillis;
             this.startTableId = startTableId;
             this.endTableId = endTableId;
+            this.isCreateParents = isCreateParents;
         }
 
         @Override
@@ -151,7 +137,7 @@ public class FlowWriterSequential implements FlowCounterMBean {
                                  Flow flow) {
             if (add) {
                 LOG.trace("Adding flow for flowId: {}, flowIid: {}", flowId, flowIid);
-                writeTransaction.put(LogicalDatastoreType.CONFIGURATION, flowIid, flow, true);
+                writeTransaction.put(LogicalDatastoreType.CONFIGURATION, flowIid, flow, isCreateParents);
             } else {
                 LOG.trace("Deleting flow for flowId: {}, flowIid: {}", flowId, flowIid);
                 writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, flowIid);
index 3bd424911e165c56d14b66eb04e81833bec9b06b..660b8d0797c9a7a1f2b27bb22d02c10557d9f225 100644 (file)
@@ -9,6 +9,9 @@ package org.opendaylight.openflowplugin.applications.bulk.o.matic;
 
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
@@ -22,19 +25,14 @@ import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
 public class FlowWriterTxChain implements FlowCounterMBean {
     private static final Logger LOG = LoggerFactory.getLogger(FlowWriterTxChain.class);
     private final DataBroker dataBroker;
     private final ExecutorService flowPusher;
     private long startTime;
     private AtomicInteger writeOpStatus = new AtomicInteger(FlowCounter.OperationStatus.INIT.status());
-    private AtomicInteger countDpnWriteCompletion = new AtomicInteger(0);
-    private AtomicLong taskCompletionTime = new AtomicLong(0);
-    private final String UNITS = "ns";
+    private AtomicInteger countDpnWriteCompletion = new AtomicInteger();
+    private AtomicLong taskCompletionTime = new AtomicLong();
 
     public FlowWriterTxChain(final DataBroker dataBroker, ExecutorService flowPusher){
         this.dataBroker = dataBroker;
@@ -43,13 +41,14 @@ public class FlowWriterTxChain implements FlowCounterMBean {
     }
 
     public void addFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize,
-                         int sleepMillis, int sleepAfter, short startTableId, short endTableId) {
+                         int sleepMillis, int sleepAfter, short startTableId, short endTableId,
+                         boolean isCreateParents) {
         LOG.info("Using Transaction Chain Flow Writer Impl");
         countDpnWriteCompletion.set(dpnCount);
         startTime = System.nanoTime();
         for (int i = 1; i <= dpnCount; i++) {
             FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i),
-                    flowsPerDPN, true, batchSize, sleepMillis, sleepAfter, startTableId, endTableId);
+                    flowsPerDPN, true, batchSize, sleepMillis, sleepAfter, startTableId, endTableId, isCreateParents);
             flowPusher.execute(task);
         }
     }
@@ -60,21 +59,11 @@ public class FlowWriterTxChain implements FlowCounterMBean {
         countDpnWriteCompletion.set(dpnCount);
         for (int i = 1; i <= dpnCount; i++) {
             FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, false, batchSize,
-                    0, 1, startTableId, endTableId);
+                    0, 1, startTableId, endTableId, false);
             flowPusher.execute(task);
         }
     }
 
-    @Override
-    public long getFlowCount() {
-        return BulkOMaticUtils.DEFAULT_FLOW_COUNT;
-    }
-
-    @Override
-    public int getReadOpStatus() {
-        return BulkOMaticUtils.DEFUALT_STATUS;
-    }
-
     @Override
     public int getWriteOpStatus() {
         return writeOpStatus.get();
@@ -85,11 +74,6 @@ public class FlowWriterTxChain implements FlowCounterMBean {
         return taskCompletionTime.get();
     }
 
-    @Override
-    public String getUnits() {
-        return UNITS;
-    }
-
     private class FlowHandlerTask implements Runnable, TransactionChainListener {
         private final String dpId;
         private final boolean add;
@@ -99,6 +83,7 @@ public class FlowWriterTxChain implements FlowCounterMBean {
         private final int sleepMillis;
         private final short startTableId;
         private final short endTableId;
+        private final boolean isCreateParents;
         private AtomicInteger remainingTxReturn = new AtomicInteger(0);
 
         BindingTransactionChain txChain;
@@ -110,7 +95,8 @@ public class FlowWriterTxChain implements FlowCounterMBean {
                                final int sleepMillis,
                                final int sleepAfter,
                                final short startTableId,
-                               final short endTableId){
+                               final short endTableId,
+                               final boolean isCreateParents){
             this.dpId = BulkOMaticUtils.DEVICE_TYPE_PREFIX + dpId;
             this.add = add;
             this.flowsPerDpn = flowsPerDpn;
@@ -119,6 +105,7 @@ public class FlowWriterTxChain implements FlowCounterMBean {
             this.sleepAfter = sleepAfter;
             this.startTableId = startTableId;
             this.endTableId = endTableId;
+            this.isCreateParents = isCreateParents;
             remainingTxReturn.set(flowsPerDpn/batchSize);
         }
 
@@ -191,7 +178,7 @@ public class FlowWriterTxChain implements FlowCounterMBean {
         private void writeTxToDs(WriteTransaction writeTransaction, String flowId, InstanceIdentifier<Flow> flowIid, Flow flow, Integer sourceIp, Short tableId){
             if (add) {
                 LOG.trace("Adding flow for flowId: {}, flowIid: {}", flowId, flowIid);
-                writeTransaction.put(LogicalDatastoreType.CONFIGURATION, flowIid, flow, true);
+                writeTransaction.put(LogicalDatastoreType.CONFIGURATION, flowIid, flow, isCreateParents);
             } else {
                 LOG.trace("Deleting flow for flowId: {}, flowIid: {}", flowId, flowIid);
                 writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, flowIid);
index 2ddfc89da39af52eae3b560cb05ce7cc28fadcb3..1a4dc4369ae6d6ab8c270d797455e09c35ae0d6f 100644 (file)
@@ -43,6 +43,8 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608
 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RemoveFlowsDsInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RemoveFlowsRpcInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.SalBulkFlowService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.TableTestInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.TableTestInput.Operation;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.bulk.flow.ds.list.grouping.BulkFlowDsItem;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
@@ -244,7 +246,7 @@ public class SalBulkFlowServiceImpl implements SalBulkFlowService {
                 flowTester.addFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
                         input.getBatchSize().intValue(), input.getSleepFor().intValue(),
                         input.getSleepAfter().intValue(), input.getStartTableId().shortValue(),
-                        input.getEndTableId().shortValue());
+                        input.getEndTableId().shortValue(), input.isCreateParents());
             } else {
                 flowTester.deleteFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
                         input.getBatchSize().intValue(), input.getStartTableId().shortValue(),
@@ -259,7 +261,8 @@ public class SalBulkFlowServiceImpl implements SalBulkFlowService {
             if (input.isIsAdd()){
                 flowTester.addFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
                         input.getBatchSize().intValue(), input.getSleepFor().intValue(),
-                        input.getStartTableId().shortValue(), input.getEndTableId().shortValue());
+                        input.getStartTableId().shortValue(), input.getEndTableId().shortValue(),
+                        input.isCreateParents());
             } else {
                 flowTester.deleteFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
                         input.getBatchSize().intValue(), input.getStartTableId().shortValue(),
@@ -272,7 +275,7 @@ public class SalBulkFlowServiceImpl implements SalBulkFlowService {
                 flowTester.addFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
                         input.getBatchSize().intValue(), input.getSleepFor().intValue(),
                         input.getSleepAfter().intValue(), input.getStartTableId().shortValue(),
-                        input.getEndTableId().shortValue());
+                        input.getEndTableId().shortValue(), input.isCreateParents());
             } else {
                 flowTester.deleteFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
                         input.getBatchSize().intValue(), input.getStartTableId().shortValue(),
@@ -283,6 +286,27 @@ public class SalBulkFlowServiceImpl implements SalBulkFlowService {
         return Futures.immediateFuture(rpcResultBuilder.build());
     }
 
+    @Override
+    public Future<RpcResult<Void>> tableTest(final TableTestInput input) {
+        final TableWriter writer = new TableWriter(dataBroker, fjService);
+        flowCounterBeanImpl.setWriter(writer);
+        switch (input.getOperation()) {
+            case Add:
+                writer.addTables(input.getDpnCount().intValue(),
+                    input.getStartTableId().shortValue(), input.getEndTableId().shortValue());
+                break;
+            case Delete:
+                writer.deleteTables(input.getDpnCount().intValue(),
+                    input.getStartTableId().shortValue(), input.getEndTableId().shortValue());
+                break;
+            default:
+                RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.failed();
+                return Futures.immediateFuture(rpcResultBuilder.build());
+        }
+        RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.success();
+        return Futures.immediateFuture(rpcResultBuilder.build());
+    }
+
     @Override
     public Future<RpcResult<Void>> flowRpcAddMultiple(FlowRpcAddMultipleInput input) {
         FlowWriterDirectOFRpc flowTesterRPC = new FlowWriterDirectOFRpc(dataBroker, flowService, fjService);
diff --git a/applications/bulk-o-matic/src/main/java/org/opendaylight/openflowplugin/applications/bulk/o/matic/TableWriter.java b/applications/bulk-o-matic/src/main/java/org/opendaylight/openflowplugin/applications/bulk/o/matic/TableWriter.java
new file mode 100644 (file)
index 0000000..5bf5716
--- /dev/null
@@ -0,0 +1,130 @@
+/*
+ * Copyright (c) 2017 Ericsson Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.applications.bulk.o.matic;
+
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.annotation.Nullable;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TableWriter implements FlowCounterMBean {
+    private final Logger LOG = LoggerFactory.getLogger(TableWriter.class);
+
+    private final AtomicInteger writeOpStatus = new AtomicInteger(FlowCounter.OperationStatus.INIT.status());
+    private final AtomicLong taskCompletionTime = new AtomicLong(BulkOMaticUtils.DEFAULT_COMPLETION_TIME);
+    private final AtomicInteger successfulWrites = new AtomicInteger();
+    private final AtomicInteger failedWrites = new AtomicInteger();
+    private final DataBroker dataBroker;
+    private final ExecutorService tablePusher;
+
+    public TableWriter(final DataBroker dataBroker, final ExecutorService tablePusher) {
+        this.dataBroker = dataBroker;
+        this.tablePusher = tablePusher;
+    }
+
+    public void addTables(final int dpnCount, final short startTableId, final short endTableId) {
+        LOG.info("Starting to add tables: {} to {} on each of {}", startTableId, endTableId, dpnCount);
+        TableHandlerTask task = new TableHandlerTask(dpnCount, startTableId, endTableId, true);
+        tablePusher.execute(task);
+    }
+
+    public void deleteTables(int dpnCount, short startTableId, short endTableId) {
+        LOG.info("Starting to delete tables: {} to {} on each of {}", startTableId, endTableId, dpnCount);
+        TableHandlerTask task = new TableHandlerTask(dpnCount, startTableId, endTableId, false);
+        tablePusher.execute(task);
+    }
+
+    @Override
+    public int getWriteOpStatus() {
+        return writeOpStatus.get();
+    }
+
+    @Override
+    public long getTaskCompletionTime() {
+        return taskCompletionTime.get();
+    }
+
+    @Override
+    public long getTableCount() {
+        return successfulWrites.get();
+    }
+
+    private class TableHandlerTask implements Runnable {
+
+        private short startTableId;
+        private short endTableId;
+        private int dpnCount;
+        private boolean isAdd;
+
+        public TableHandlerTask(int dpnCount, short startTableId, short endTableId, boolean isAdd) {
+            this.dpnCount = dpnCount;
+            this.startTableId = startTableId;
+            this.endTableId = endTableId;
+            this.isAdd = isAdd;
+        }
+
+        @Override
+        public void run() {
+            writeOpStatus.set(FlowCounter.OperationStatus.IN_PROGRESS.status());
+            int totalTables = dpnCount * (endTableId - startTableId + 1);
+
+            for (int dpn = 1; dpn <= dpnCount; dpn++) {
+                String dpId = BulkOMaticUtils.DEVICE_TYPE_PREFIX + String.valueOf(dpn);
+                for (short tableId = startTableId; tableId <= endTableId; tableId++) {
+                    WriteTransaction wtx = dataBroker.newWriteOnlyTransaction();
+                    Table table = new TableBuilder().setKey(new TableKey(tableId))
+                            .setId(tableId)
+                            .build();
+                    InstanceIdentifier<Table> tableIId = BulkOMaticUtils.getTableId(tableId, dpId);
+
+                    if (isAdd) {
+                        wtx.put(LogicalDatastoreType.CONFIGURATION, tableIId, table, true);
+                    } else {
+                        wtx.delete(LogicalDatastoreType.CONFIGURATION, tableIId);
+                    }
+
+                    CheckedFuture<Void, TransactionCommitFailedException> future = wtx.submit();
+
+                    Futures.addCallback(future, new FutureCallback<Void>() {
+                        @Override
+                        public void onSuccess(@Nullable Void v) {
+                            if (successfulWrites.incrementAndGet() == totalTables) {
+                                if (failedWrites.get() > 0) {
+                                    writeOpStatus.set(FlowCounter.OperationStatus.FAILURE.status());
+                                } else {
+                                    writeOpStatus.set(FlowCounter.OperationStatus.SUCCESS.status());
+                                }
+                            }
+                        }
+
+                        @Override
+                        public void onFailure(Throwable throwable) {
+                            LOG.error("Table addition Failed. Error: {}", throwable);
+                            if (failedWrites.incrementAndGet() == totalTables) {
+                                writeOpStatus.set(FlowCounter.OperationStatus.FAILURE.status());
+                            }
+                        }
+                    });
+                }
+            }
+        }
+    }
+}
index 965b335c4536c868e5c1b5efe77c3f26acf2973d..3eda3a0342bd1430af041fc255bb3b4b2a1ab37d 100644 (file)
@@ -73,68 +73,74 @@ module sal-bulk-flow {
     }
 
     rpc flow-test {
-       input {
-           leaf is-add {
-               type boolean;
-               mandatory true;
-               status current;
-               description "Add or delete";
-           }
-           leaf dpn-count {
-               type uint32;
-               mandatory true;
-               status current;
-               description "No of DPNs";
-           }
-           leaf flows-per-dpn {
-               type uint32;
-               mandatory true;
-               status current;
-               description "Flows to be pushed per DPN";
-           }
-           leaf start-table-id {
-               type uint32;
-               mandatory true;
-               status current;
-               description "Start adding flows from this table id";
-           }
-           leaf end-table-id {
-               type uint32;
-               mandatory true;
-               status current;
-               description "The last table to add flows to and then wrap around";
-           }
-           leaf batch-size {
-               type uint32;
-               mandatory true;
-               status current;
-               description "batch size";
-           }
-           leaf seq {
-               type boolean;
-               mandatory true;
-               status current;
-               description "Whether to use sequential or concurrent writer";
-           }
-           leaf tx-chain {
-               type boolean;
-               mandatory true;
-               status current;
-               description "Whether to use PingPong Broker or not. seq is ignored.";
-           }
-           leaf sleep-for {
-               type uint32;
-               mandatory true;
-               status current;
-               description "sleep for the given milliseconds";
-           }
-           leaf sleep-after {
-               type uint32;
-               mandatory true;
-               status current;
-               description "Sleep after the given number of iterations. Will be used in the concurrent case only";
-           }
-       }
+        input {
+            leaf create-parents {
+                type boolean;
+                mandatory true;
+                status current;
+                description "Create parents if doesn't exist";
+            }
+            leaf is-add {
+                type boolean;
+                mandatory true;
+                status current;
+                description "Add or delete";
+            }
+            leaf dpn-count {
+                type uint32;
+                mandatory true;
+                status current;
+                description "No of DPNs";
+            }
+            leaf flows-per-dpn {
+                type uint32;
+                mandatory true;
+                status current;
+                description "Flows to be pushed per DPN";
+            }
+            leaf start-table-id {
+                type uint32;
+                mandatory true;
+                status current;
+                description "Start adding flows from this table id";
+            }
+            leaf end-table-id {
+                type uint32;
+                mandatory true;
+                status current;
+                description "The last table to add flows to and then wrap around";
+            }
+            leaf batch-size {
+                type uint32;
+                mandatory true;
+                status current;
+                description "batch size";
+            }
+            leaf seq {
+                type boolean;
+                mandatory true;
+                status current;
+                description "Whether to use sequential or concurrent writer";
+            }
+            leaf tx-chain {
+                type boolean;
+                mandatory true;
+                status current;
+                description "Whether to use PingPong Broker or not. seq is ignored.";
+            }
+            leaf sleep-for {
+                type uint32;
+                mandatory true;
+                status current;
+                description "sleep for the given milliseconds";
+            }
+            leaf sleep-after {
+                type uint32;
+                mandatory true;
+                status current;
+                description "Sleep after the given number of iterations. Will be used in the concurrent case only";
+            }
+        }
     }
 
     rpc read-flow-test {
@@ -217,4 +223,38 @@ module sal-bulk-flow {
            }
        }
     }
+
+    rpc table-test {
+        input {
+            leaf operation {
+                type enumeration {
+                    enum add;
+                    enum delete;
+                }
+                status current;
+                description "Type of operation, add or delete";
+            }
+
+            leaf dpn-count {
+                type uint32;
+                mandatory true;
+                status current;
+                description "Total number of dpns to add these tables";
+            }
+
+            leaf start-table-id {
+                type uint32;
+                mandatory true;
+                status current;
+                description "Starting table id";
+            }
+
+            leaf end-table-id {
+                type uint32;
+                mandatory true;
+                status current;
+                description "Last table id";
+            }
+        }
+    }
 }
index 53a27b31c09f5d6e6330f61ed807b1b93f96c309..769f70d9fda38e47dd04f7ae5cf35781c4be0b18 100644 (file)
@@ -68,7 +68,7 @@ public class FlowWriterConcurrentTest {
     }
     @Test
     public void testAddFlows() throws Exception {
-        flowWriterConcurrent.addFlows(1, FLOWS_PER_DPN, 10, 10, 10, (short)0, (short)1);
+        flowWriterConcurrent.addFlows(1, FLOWS_PER_DPN, 10, 10, 10, (short)0, (short)1, true);
         Mockito.verify(wTx, Mockito.times(FLOWS_PER_DPN)).put(Matchers.<LogicalDatastoreType>any(), Matchers.<InstanceIdentifier<DataObject>>any(), Matchers.<DataObject>any(), Matchers.anyBoolean());
     }
 
index 5b570543556b9441434cefd7bbdffae948bfad1b..0aa5cee7966124ae41666b6333016cf13c372ee8 100644 (file)
@@ -65,7 +65,7 @@ public class FlowWriterSequentialTest {
     }
     @Test
     public void testAddFlows() throws Exception {
-        flowWriterSequential.addFlows(1, FLOWS_PER_DPN, 10, 10, (short)0, (short)1);
+        flowWriterSequential.addFlows(1, FLOWS_PER_DPN, 10, 10, (short)0, (short)1, true);
         Mockito.verify(wTx, Mockito.times(FLOWS_PER_DPN)).put(Matchers.<LogicalDatastoreType>any(), Matchers.<InstanceIdentifier<DataObject>>any(), Matchers.<DataObject>any(), Matchers.anyBoolean());
     }
 
index 9f3e239972117cd486be8ba4f9338b1e7677e980..e5b003b757bca04dd25974a660cf8528d4f20632 100644 (file)
@@ -72,7 +72,7 @@ public class FlowWriterTxChainTest {
     }
     @Test
     public void testAddFlows() throws Exception {
-        flowWriterTxChain.addFlows(1, FLOWS_PER_DPN, 10, 10, 10, (short)0, (short)1);
+        flowWriterTxChain.addFlows(1, FLOWS_PER_DPN, 10, 10, 10, (short)0, (short)1, true);
         Mockito.verify(wTx, Mockito.times(FLOWS_PER_DPN)).put(Matchers.<LogicalDatastoreType>any(), Matchers.<InstanceIdentifier<DataObject>>any(), Matchers.<DataObject>any(), Matchers.anyBoolean());
     }
 
index 2940240945c33e45e78b780b9816e85c14595a21..f8c564d1775a07656a036cfee0ce5e9f1606c574 100644 (file)
@@ -47,6 +47,9 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608
 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RemoveFlowsDsInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RemoveFlowsRpcInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RemoveFlowsRpcInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.TableTestInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.TableTestInput.Operation;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.TableTestInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.bulk.flow.ds.list.grouping.BulkFlowDsItem;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.bulk.flow.ds.list.grouping.BulkFlowDsItemBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.bulk.flow.list.grouping.BulkFlowItem;
@@ -216,7 +219,8 @@ public class SalBulkFlowServiceImplTest {
                 .setSleepAfter(20L)
                 .setSleepFor(1L)
                 .setStartTableId(1L)
-                .setTxChain(true);
+                .setTxChain(true)
+                .setCreateParents(true);
 
         FlowTestInput flowTestInput = flowTestInputBuilder.build();
 
@@ -261,4 +265,22 @@ public class SalBulkFlowServiceImplTest {
 
         Assert.assertTrue(salBulkFlowService.flowRpcAddMultiple(flowRpcAddMultipleInput).get().isSuccessful());
     }
+
+    @Test
+    public void testTableTest() throws Exception {
+        final TableTestInputBuilder tableTestInputBuilder = new TableTestInputBuilder()
+                .setStartTableId(0L)
+                .setEndTableId(99L)
+                .setDpnCount(1L)
+                .setOperation(Operation.Add);
+
+        TableTestInput tableTestInput = tableTestInputBuilder.build();
+
+        Assert.assertTrue(salBulkFlowService.tableTest(tableTestInput).get().isSuccessful());
+
+        tableTestInputBuilder.setOperation(Operation.Delete);
+        tableTestInput = tableTestInputBuilder.build();
+
+        Assert.assertTrue(salBulkFlowService.tableTest(tableTestInput).get().isSuccessful());
+    }
 }
\ No newline at end of file
diff --git a/applications/bulk-o-matic/src/test/java/org/opendaylight/openflowplugin/applications/bulk/o/matic/TableWriterTest.java b/applications/bulk-o-matic/src/test/java/org/opendaylight/openflowplugin/applications/bulk/o/matic/TableWriterTest.java
new file mode 100644 (file)
index 0000000..775ecd0
--- /dev/null
@@ -0,0 +1,80 @@
+/**
+ * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.bulk.o.matic;
+
+import static org.mockito.Mockito.doReturn;
+
+import com.google.common.util.concurrent.Futures;
+import java.util.concurrent.ExecutorService;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Matchers;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test for {@link FlowWriterSequential}.
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class TableWriterTest {
+    private static final Logger LOG = LoggerFactory.getLogger(TableWriterTest.class);
+
+    private static final int TABLES_PER_DPN = 100;
+    private static final int DPN_COUNT = 1;
+    private static final short START_TABLE_ID = 0;
+    private static final short END_TABLE_ID = 99;
+
+    @Mock
+    private DataBroker mockDataBroker;
+    @Mock
+    private ExecutorService mockTablePusher;
+    @Mock
+    private WriteTransaction wTx;
+
+    private TableWriter tableWriter;
+
+    @Before
+    public void setUp() throws Exception {
+
+        doReturn(wTx).when(mockDataBroker).newWriteOnlyTransaction();
+        Mockito.when(wTx.submit()).thenReturn(Futures.immediateCheckedFuture(null));
+
+        Mockito.doAnswer(new Answer<Void>() {
+            @Override
+            public Void answer(InvocationOnMock invocation) throws Throwable {
+                ((Runnable)invocation.getArguments()[0]).run();
+                return null;
+            }
+        }).when(mockTablePusher).execute(Matchers.<Runnable>any());
+
+        tableWriter = new TableWriter(mockDataBroker, mockTablePusher);
+    }
+    @Test
+    public void testAddTables() throws Exception {
+        tableWriter.addTables(DPN_COUNT, START_TABLE_ID, END_TABLE_ID);
+        Mockito.verify(wTx, Mockito.times(TABLES_PER_DPN)).put(Matchers.<LogicalDatastoreType>any(), Matchers.<InstanceIdentifier<DataObject>>any(), Matchers.<DataObject>any(), Matchers.anyBoolean());
+    }
+
+    @Test
+    public void testDeleteTables() throws Exception {
+        tableWriter.deleteTables(DPN_COUNT, START_TABLE_ID, END_TABLE_ID);
+        Mockito.verify(wTx, Mockito.times(TABLES_PER_DPN)).delete(Matchers.<LogicalDatastoreType>any(), Matchers.<InstanceIdentifier<DataObject>>any());
+    }
+}
\ No newline at end of file