import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
public class FlowWriterConcurrent implements FlowCounterMBean {
private static final Logger LOG = LoggerFactory.getLogger(FlowWriterConcurrent.class);
+ public static final String USING_CONCURRENT_IMPLEMENTATION_OF_FLOW_WRITER =
+ "Using Concurrent implementation of Flow Writer.";
private final DataBroker dataBroker;
private final ExecutorService flowPusher;
private long startTime;
- private AtomicInteger writeOpStatus = new AtomicInteger(FlowCounter.OperationStatus.INIT.status());
- private AtomicInteger countDpnWriteCompletion = new AtomicInteger(0);
- private AtomicLong taskCompletionTime = new AtomicLong(0);
- private final String UNITS = "ns";
+ private final AtomicInteger writeOpStatus = new AtomicInteger(FlowCounter.OperationStatus.INIT.status());
+ private final AtomicInteger countDpnWriteCompletion = new AtomicInteger();
+ private final AtomicLong taskCompletionTime = new AtomicLong();
public FlowWriterConcurrent(final DataBroker dataBroker, ExecutorService flowPusher) {
this.dataBroker = dataBroker;
this.flowPusher = flowPusher;
- LOG.info("Using Concurrent implementation of Flow Writer.");
+ LOG.info(USING_CONCURRENT_IMPLEMENTATION_OF_FLOW_WRITER);
}
- public void addFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize,
- int sleepMillis, int sleepAfter, short startTableId, short endTableId) {
- LOG.info("Using Concurrent implementation of Flow Writer.");
+ public void addFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize, int sleepMillis, int sleepAfter,
+ short startTableId, short endTableId, boolean isCreateParents) {
+ LOG.info(USING_CONCURRENT_IMPLEMENTATION_OF_FLOW_WRITER);
countDpnWriteCompletion.set(dpnCount);
startTime = System.nanoTime();
for (int i = 1; i <= dpnCount; i++) {
- FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i),
- flowsPerDPN, true, batchSize, sleepMillis, sleepAfter, startTableId, endTableId);
+ FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, true, batchSize, sleepMillis,
+ sleepAfter, startTableId, endTableId, isCreateParents);
flowPusher.execute(task);
}
}
- public void deleteFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize,
- short startTableId, short endTableId) {
- LOG.info("Using Concurrent implementation of Flow Writer.");
+ public void deleteFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize, short startTableId,
+ short endTableId) {
+ LOG.info(USING_CONCURRENT_IMPLEMENTATION_OF_FLOW_WRITER);
countDpnWriteCompletion.set(dpnCount);
for (int i = 1; i <= dpnCount; i++) {
- FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, false, batchSize,
- 0, 1, startTableId, endTableId);
+ FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, false, batchSize, 0, 1,
+ startTableId, endTableId, false);
flowPusher.execute(task);
}
}
- @Override
- public long getFlowCount() {
- return BulkOMaticUtils.DEFAULT_FLOW_COUNT;
- }
-
- @Override
- public int getReadOpStatus() {
- return BulkOMaticUtils.DEFUALT_STATUS;
- }
-
@Override
public int getWriteOpStatus() {
return writeOpStatus.get();
return taskCompletionTime.get();
}
- @Override
- public String getUnits() {
- return UNITS;
- }
-
private class FlowHandlerTask implements Runnable {
private final String dpId;
private final boolean add;
private final int sleepMillis;
private final short startTableId;
private final short endTableId;
- private AtomicInteger remainingTxReturn = new AtomicInteger(0);
-
- public FlowHandlerTask(final String dpId,
- final int flowsPerDpn,
- final boolean add,
- final int batchSize,
- final int sleepMillis,
- final int sleepAfter,
- final short startTableId,
- final short endTableId){
+ private final AtomicInteger remainingTxReturn = new AtomicInteger(0);
+ private final boolean isCreateParents;
+
+ FlowHandlerTask(final String dpId, final int flowsPerDpn,
+ final boolean add, final int batchSize,
+ final int sleepMillis, final int sleepAfter,
+ final short startTableId, final short endTableId,
+ final boolean isCreateParents) {
this.dpId = BulkOMaticUtils.DEVICE_TYPE_PREFIX + dpId;
this.add = add;
this.flowsPerDpn = flowsPerDpn;
this.sleepAfter = sleepAfter;
this.startTableId = startTableId;
this.endTableId = endTableId;
- remainingTxReturn.set(flowsPerDpn/batchSize);
+ this.isCreateParents = isCreateParents;
+ remainingTxReturn.set(flowsPerDpn / batchSize);
}
@Override
public void run() {
- LOG.info("Starting flow writer task for dpid: {}. Number of transactions: {}", dpId, flowsPerDpn/batchSize);
+ LOG.info("Starting flow writer task for dpid: {}. Number of transactions: {}", dpId,
+ flowsPerDpn / batchSize);
writeOpStatus.set(FlowCounter.OperationStatus.IN_PROGRESS.status());
short tableId = startTableId;
- int numSubmits = flowsPerDpn/batchSize;
+ int numSubmits = flowsPerDpn / batchSize;
int sourceIp = 1;
int newBatchSize = batchSize;
for (int i = 1; i <= numSubmits; i++) {
WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
- short k = tableId;
+ short calculatedTableId = tableId;
for (; sourceIp <= newBatchSize; sourceIp++) {
- String flowId = "Flow-" + dpId + "." + k + "." + sourceIp;
+ String flowId = "Flow-" + dpId + "." + calculatedTableId + "." + sourceIp;
Flow flow = null;
if (add) {
Match match = BulkOMaticUtils.getMatch(sourceIp);
- flow = BulkOMaticUtils.buildFlow(k, flowId, match);
+ flow = BulkOMaticUtils.buildFlow(calculatedTableId, flowId, match);
}
addFlowToTx(writeTransaction, flowId,
- BulkOMaticUtils.getFlowInstanceIdentifier(k, flowId, dpId), flow, sourceIp, k);
+ BulkOMaticUtils.getFlowInstanceIdentifier(calculatedTableId, flowId, dpId), flow, sourceIp,
+ calculatedTableId);
if (sourceIp < newBatchSize) {
- short a = 1;
- short b = (short)(endTableId - startTableId + 1);
- k = (short) (((k + a) % b) + startTableId);
+ short numberA = 1;
+ short numberB = (short) (endTableId - startTableId + 1);
+ calculatedTableId = (short) ((calculatedTableId + numberA) % numberB + startTableId);
}
}
- Futures.addCallback(writeTransaction.submit(), new DsCallBack(dpId, tableId, k, sourceIp));
+ Futures.addCallback(writeTransaction.submit(),
+ new DsCallBack(dpId, tableId, calculatedTableId, sourceIp));
// Wrap around
- tableId = (short)(((k + 1)%((short)(endTableId - startTableId + 1))) + startTableId);
+ tableId = (short) ((calculatedTableId + 1) % (short) (endTableId - startTableId + 1) + startTableId);
newBatchSize += batchSize;
- if (((i%sleepAfter) == 0) && (sleepMillis > 0)) {
+ if (i % sleepAfter == 0 && sleepMillis > 0) {
try {
Thread.sleep(sleepMillis);
} catch (InterruptedException e) {
}
}
- private void addFlowToTx(WriteTransaction writeTransaction, String flowId, InstanceIdentifier<Flow> flowIid, Flow flow, Integer sourceIp, Short tableId){
+ private void addFlowToTx(WriteTransaction writeTransaction, String flowId, InstanceIdentifier<Flow> flowIid,
+ Flow flow, Integer sourceIp, Short tableId) {
if (add) {
LOG.trace("Adding flow for flowId: {}, flowIid: {}", flowId, flowIid);
- writeTransaction.put(LogicalDatastoreType.CONFIGURATION, flowIid, flow, true);
+ writeTransaction.put(LogicalDatastoreType.CONFIGURATION, flowIid, flow, isCreateParents);
} else {
LOG.trace("Deleting flow for flowId: {}, flowIid: {}", flowId, flowIid);
writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, flowIid);
}
private class DsCallBack implements FutureCallback {
- private String dpId;
- private int sourceIp;
- private short endTableId;
- private short beginTableId;
+ private final String dpId;
+ private final int sourceIp;
+ private final short endTableId;
+ private final short beginTableId;
- public DsCallBack(String dpId, Short beginTableId, Short endTableId, Integer sourceIp) {
+ DsCallBack(String dpId, Short beginTableId, Short endTableId, Integer sourceIp) {
this.dpId = dpId;
this.sourceIp = sourceIp;
this.endTableId = endTableId;
}
@Override
- public void onSuccess(Object o) {
+ public void onSuccess(Object object) {
if (remainingTxReturn.decrementAndGet() <= 0) {
long dur = System.nanoTime() - startTime;
- LOG.info("Completed all flows installation for: dpid: {} in {}ns", dpId,
- dur);
- if(0 == countDpnWriteCompletion.decrementAndGet() &&
- writeOpStatus.get() != FlowCounter.OperationStatus.FAILURE.status()) {
+ LOG.info("Completed all flows installation for: dpid: {} in {}ns", dpId, dur);
+ if (0 == countDpnWriteCompletion.decrementAndGet()
+ && writeOpStatus.get() != FlowCounter.OperationStatus.FAILURE.status()) {
writeOpStatus.set(FlowCounter.OperationStatus.SUCCESS.status());
taskCompletionTime.set(dur);
}
}
}
+ @Override
public void onFailure(Throwable error) {
if (remainingTxReturn.decrementAndGet() <= 0) {
long dur = System.nanoTime() - startTime;
- LOG.info("Completed all flows installation for: dpid: {} in {}ns", dpId,
- dur);
+ LOG.info("Completed all flows installation for: dpid: {} in {}ns", dpId, dur);
}
- LOG.error("Error: {} in Datastore write operation: dpid: {}, begin tableId: {}, " +
- "end tableId: {}, sourceIp: {} ", error, dpId, beginTableId, endTableId, sourceIp);
+ LOG.error("Error: {} in Datastore write operation: dpid: {}, begin tableId: {}, "
+ + "end tableId: {}, sourceIp: {} ", error, dpId, beginTableId, endTableId, sourceIp);
writeOpStatus.set(FlowCounter.OperationStatus.FAILURE.status());
}
}
}
-}
\ No newline at end of file
+}