import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.MoreExecutors;
+
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
LOG.info("Using Sequential implementation of Flow Writer.");
}
- public void addFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize, int sleepMillis, short startTableId,
+ public void addFlows(Integer count, Integer flowsPerDPN, int batchSize, int sleepMillis, short startTableId,
short endTableId, boolean isCreateParents) {
LOG.info("Using Sequential implementation of Flow Writer.");
- this.dpnCount = dpnCount;
- countDpnWriteCompletion.set(dpnCount);
+ this.dpnCount = count;
+ countDpnWriteCompletion.set(count);
startTime = System.nanoTime();
- for (int i = 1; i <= dpnCount; i++) {
+ for (int i = 1; i <= count; i++) {
FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, true, batchSize, sleepMillis,
startTableId, endTableId, isCreateParents);
flowPusher.execute(task);
}
}
- public void deleteFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize, short startTableId,
+ public void deleteFlows(Integer count, Integer flowsPerDPN, int batchSize, short startTableId,
short endTableId) {
LOG.info("Using Sequential implementation of Flow Writer.");
- countDpnWriteCompletion.set(dpnCount);
- for (int i = 1; i <= dpnCount; i++) {
+ countDpnWriteCompletion.set(count);
+ for (int i = 1; i <= count; i++) {
FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, false, batchSize, 0,
startTableId, endTableId, false);
flowPusher.execute(task);
LOG.debug("Submitting Txn for dpId: {}, begin tableId: {}, end tableId: {}, sourceIp: {}", dpId, tableId,
calculatedTableId, sourceIp);
- Futures.addCallback(writeTransaction.submit(), new DsCallBack(dpId, sourceIp, calculatedTableId));
+ Futures.addCallback(writeTransaction.submit(), new DsCallBack(dpId, sourceIp, calculatedTableId),
+ MoreExecutors.directExecutor());
}
private void addFlowToTx(WriteTransaction writeTransaction, String flowId, InstanceIdentifier<Flow> flowIid,
}
LOG.debug("OnSuccess: Submitting Txn for dpId: {}, begin tableId: {}, end tableId: {}, sourceIp: {}",
dpId, tableId, calculatedTableId, sourceIp);
- Futures.addCallback(writeTransaction.submit(), new DsCallBack(dpId, sourceIp, calculatedTableId));
+ Futures.addCallback(writeTransaction.submit(), new DsCallBack(dpId, sourceIp, calculatedTableId),
+ MoreExecutors.directExecutor());
}
@Override