public static final int DEFUALT_STATUS = FlowCounter.OperationStatus.INIT.status();
public static final int DEFAULT_FLOW_COUNT = 0;
+ public static final int DEFAULT_TABLE_COUNT = 0;
public static final long DEFAULT_COMPLETION_TIME = 0;
public static final String DEFAULT_UNITS = "ns";
public static final String DEVICE_TYPE_PREFIX = "openflow:";
return BulkOMaticUtils.DEFAULT_UNITS;
}
}
+
+ @Override
+ public long getTableCount() {
+ if (writer != null) {
+ return writer.getTableCount();
+ }
+ return BulkOMaticUtils.DEFAULT_TABLE_COUNT;
+ }
}
public interface FlowCounterMBean {
- public long getFlowCount();
+ default public long getFlowCount() {
+ return BulkOMaticUtils.DEFAULT_FLOW_COUNT;
+ }
- public int getReadOpStatus();
+ default public int getReadOpStatus() {
+ return BulkOMaticUtils.DEFUALT_STATUS;
+ }
- public int getWriteOpStatus();
+ default public int getWriteOpStatus() {
+ return BulkOMaticUtils.DEFUALT_STATUS;
+ }
- public long getTaskCompletionTime();
+ default public long getTaskCompletionTime() {
+ return BulkOMaticUtils.DEFAULT_COMPLETION_TIME;
+ }
- public String getUnits();
+ default public String getUnits() {
+ return BulkOMaticUtils.DEFAULT_UNITS;
+ }
+
+ default public long getTableCount() {
+ return BulkOMaticUtils.DEFAULT_TABLE_COUNT;
+ }
}
package org.opendaylight.openflowplugin.applications.bulk.o.matic;
import com.google.common.base.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
public class FlowReader implements Runnable, FlowCounterMBean {
private static final Logger LOG = LoggerFactory.getLogger(FlowReader.class);
private final DataBroker dataBroker;
private final short startTableId;
private final short endTableId;
private final boolean isConfigDs;
- private AtomicLong flowCount = new AtomicLong(0);
+ private AtomicLong flowCount = new AtomicLong();
private AtomicInteger readOpStatus = new AtomicInteger(FlowCounter.OperationStatus.INIT.status());
- private static final String UNITS = "ns";
private FlowReader(final DataBroker dataBroker,
final Integer dpnCount,
public int getReadOpStatus() {
return readOpStatus.get();
}
-
- @Override
- public int getWriteOpStatus() {
- return BulkOMaticUtils.DEFUALT_STATUS;
- }
-
- @Override
- public long getTaskCompletionTime() {
- return BulkOMaticUtils.DEFAULT_COMPLETION_TIME;
- }
-
- @Override
- public String getUnits() {
- return UNITS;
- }
}
\ No newline at end of file
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
public class FlowWriterConcurrent implements FlowCounterMBean {
private static final Logger LOG = LoggerFactory.getLogger(FlowWriterConcurrent.class);
public static final String USING_CONCURRENT_IMPLEMENTATION_OF_FLOW_WRITER = "Using Concurrent implementation of Flow Writer.";
private final ExecutorService flowPusher;
private long startTime;
private AtomicInteger writeOpStatus = new AtomicInteger(FlowCounter.OperationStatus.INIT.status());
- private AtomicInteger countDpnWriteCompletion = new AtomicInteger(0);
- private AtomicLong taskCompletionTime = new AtomicLong(0);
- private static final String UNITS = "ns";
+ private AtomicInteger countDpnWriteCompletion = new AtomicInteger();
+ private AtomicLong taskCompletionTime = new AtomicLong();
public FlowWriterConcurrent(final DataBroker dataBroker, ExecutorService flowPusher) {
this.dataBroker = dataBroker;
}
public void addFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize,
- int sleepMillis, int sleepAfter, short startTableId, short endTableId) {
+ int sleepMillis, int sleepAfter, short startTableId, short endTableId,
+ boolean isCreateParents) {
LOG.info(USING_CONCURRENT_IMPLEMENTATION_OF_FLOW_WRITER);
countDpnWriteCompletion.set(dpnCount);
startTime = System.nanoTime();
for (int i = 1; i <= dpnCount; i++) {
FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i),
- flowsPerDPN, true, batchSize, sleepMillis, sleepAfter, startTableId, endTableId);
+ flowsPerDPN, true, batchSize, sleepMillis, sleepAfter, startTableId, endTableId, isCreateParents);
flowPusher.execute(task);
}
}
countDpnWriteCompletion.set(dpnCount);
for (int i = 1; i <= dpnCount; i++) {
FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, false, batchSize,
- 0, 1, startTableId, endTableId);
+ 0, 1, startTableId, endTableId, false);
flowPusher.execute(task);
}
}
- @Override
- public long getFlowCount() {
- return BulkOMaticUtils.DEFAULT_FLOW_COUNT;
- }
-
- @Override
- public int getReadOpStatus() {
- return BulkOMaticUtils.DEFUALT_STATUS;
- }
-
@Override
public int getWriteOpStatus() {
return writeOpStatus.get();
return taskCompletionTime.get();
}
- @Override
- public String getUnits() {
- return UNITS;
- }
-
private class FlowHandlerTask implements Runnable {
private final String dpId;
private final boolean add;
private final short startTableId;
private final short endTableId;
private AtomicInteger remainingTxReturn = new AtomicInteger(0);
+ private final boolean isCreateParents;
public FlowHandlerTask(final String dpId,
final int flowsPerDpn,
final int sleepMillis,
final int sleepAfter,
final short startTableId,
- final short endTableId){
+ final short endTableId,
+ final boolean isCreateParents){
this.dpId = BulkOMaticUtils.DEVICE_TYPE_PREFIX + dpId;
this.add = add;
this.flowsPerDpn = flowsPerDpn;
this.sleepAfter = sleepAfter;
this.startTableId = startTableId;
this.endTableId = endTableId;
+ this.isCreateParents = isCreateParents;
remainingTxReturn.set(flowsPerDpn/batchSize);
}
private void addFlowToTx(WriteTransaction writeTransaction, String flowId, InstanceIdentifier<Flow> flowIid, Flow flow, Integer sourceIp, Short tableId){
if (add) {
LOG.trace("Adding flow for flowId: {}, flowIid: {}", flowId, flowIid);
- writeTransaction.put(LogicalDatastoreType.CONFIGURATION, flowIid, flow, true);
+ writeTransaction.put(LogicalDatastoreType.CONFIGURATION, flowIid, flow, isCreateParents);
} else {
LOG.trace("Deleting flow for flowId: {}, flowIid: {}", flowId, flowIid);
writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, flowIid);
package org.opendaylight.openflowplugin.applications.bulk.o.matic;
import com.google.common.base.Optional;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-
public class FlowWriterDirectOFRpc {
private static final Logger LOG = LoggerFactory.getLogger(FlowWriterDirectOFRpc.class);
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
public class FlowWriterSequential implements FlowCounterMBean {
private static final Logger LOG = LoggerFactory.getLogger(FlowWriterSequential.class);
private final DataBroker dataBroker;
protected int dpnCount;
private long startTime;
private AtomicInteger writeOpStatus = new AtomicInteger(FlowCounter.OperationStatus.INIT.status());
- private AtomicInteger countDpnWriteCompletion = new AtomicInteger(0);
- private AtomicLong taskCompletionTime = new AtomicLong(0);
- private static final String UNITS = "ns";
+ private AtomicInteger countDpnWriteCompletion = new AtomicInteger();
+ private AtomicLong taskCompletionTime = new AtomicLong();
public FlowWriterSequential(final DataBroker dataBroker, ExecutorService flowPusher) {
this.dataBroker = dataBroker;
}
public void addFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize, int sleepMillis,
- short startTableId, short endTableId) {
+ short startTableId, short endTableId, boolean isCreateParents) {
LOG.info("Using Sequential implementation of Flow Writer.");
this.dpnCount = dpnCount;
countDpnWriteCompletion.set(dpnCount);
startTime = System.nanoTime();
for (int i = 1; i <= dpnCount; i++) {
FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, true, batchSize,
- sleepMillis, startTableId, endTableId);
+ sleepMillis, startTableId, endTableId, isCreateParents);
flowPusher.execute(task);
}
}
countDpnWriteCompletion.set(dpnCount);
for (int i = 1; i <= dpnCount; i++) {
FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, false, batchSize, 0,
- startTableId, endTableId);
+ startTableId, endTableId, false);
flowPusher.execute(task);
}
}
- @Override
- public long getFlowCount() {
- return BulkOMaticUtils.DEFAULT_FLOW_COUNT;
- }
-
- @Override
- public int getReadOpStatus() {
- return BulkOMaticUtils.DEFUALT_STATUS;
- }
-
@Override
public int getWriteOpStatus() {
return writeOpStatus.get();
return taskCompletionTime.get();
}
- @Override
- public String getUnits() {
- return UNITS;
- }
-
private class FlowHandlerTask implements Runnable {
private final String dpId;
private final int flowsPerDpn;
private final int sleepMillis;
private final short startTableId;
private final short endTableId;
+ private final boolean isCreateParents;
public FlowHandlerTask(final String dpId,
final int flowsPerDpn,
final int batchSize,
int sleepMillis,
final short startTableId,
- final short endTableId){
+ final short endTableId,
+ final boolean isCreateParents){
this.dpId = BulkOMaticUtils.DEVICE_TYPE_PREFIX + dpId;
this.add = add;
this.flowsPerDpn = flowsPerDpn;
this.sleepMillis = sleepMillis;
this.startTableId = startTableId;
this.endTableId = endTableId;
+ this.isCreateParents = isCreateParents;
}
@Override
Flow flow) {
if (add) {
LOG.trace("Adding flow for flowId: {}, flowIid: {}", flowId, flowIid);
- writeTransaction.put(LogicalDatastoreType.CONFIGURATION, flowIid, flow, true);
+ writeTransaction.put(LogicalDatastoreType.CONFIGURATION, flowIid, flow, isCreateParents);
} else {
LOG.trace("Deleting flow for flowId: {}, flowIid: {}", flowId, flowIid);
writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, flowIid);
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
public class FlowWriterTxChain implements FlowCounterMBean {
private static final Logger LOG = LoggerFactory.getLogger(FlowWriterTxChain.class);
private final DataBroker dataBroker;
private final ExecutorService flowPusher;
private long startTime;
private AtomicInteger writeOpStatus = new AtomicInteger(FlowCounter.OperationStatus.INIT.status());
- private AtomicInteger countDpnWriteCompletion = new AtomicInteger(0);
- private AtomicLong taskCompletionTime = new AtomicLong(0);
- private final String UNITS = "ns";
+ private AtomicInteger countDpnWriteCompletion = new AtomicInteger();
+ private AtomicLong taskCompletionTime = new AtomicLong();
public FlowWriterTxChain(final DataBroker dataBroker, ExecutorService flowPusher){
this.dataBroker = dataBroker;
}
public void addFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize,
- int sleepMillis, int sleepAfter, short startTableId, short endTableId) {
+ int sleepMillis, int sleepAfter, short startTableId, short endTableId,
+ boolean isCreateParents) {
LOG.info("Using Transaction Chain Flow Writer Impl");
countDpnWriteCompletion.set(dpnCount);
startTime = System.nanoTime();
for (int i = 1; i <= dpnCount; i++) {
FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i),
- flowsPerDPN, true, batchSize, sleepMillis, sleepAfter, startTableId, endTableId);
+ flowsPerDPN, true, batchSize, sleepMillis, sleepAfter, startTableId, endTableId, isCreateParents);
flowPusher.execute(task);
}
}
countDpnWriteCompletion.set(dpnCount);
for (int i = 1; i <= dpnCount; i++) {
FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, false, batchSize,
- 0, 1, startTableId, endTableId);
+ 0, 1, startTableId, endTableId, false);
flowPusher.execute(task);
}
}
- @Override
- public long getFlowCount() {
- return BulkOMaticUtils.DEFAULT_FLOW_COUNT;
- }
-
- @Override
- public int getReadOpStatus() {
- return BulkOMaticUtils.DEFUALT_STATUS;
- }
-
@Override
public int getWriteOpStatus() {
return writeOpStatus.get();
return taskCompletionTime.get();
}
- @Override
- public String getUnits() {
- return UNITS;
- }
-
private class FlowHandlerTask implements Runnable, TransactionChainListener {
private final String dpId;
private final boolean add;
private final int sleepMillis;
private final short startTableId;
private final short endTableId;
+ private final boolean isCreateParents;
private AtomicInteger remainingTxReturn = new AtomicInteger(0);
BindingTransactionChain txChain;
final int sleepMillis,
final int sleepAfter,
final short startTableId,
- final short endTableId){
+ final short endTableId,
+ final boolean isCreateParents){
this.dpId = BulkOMaticUtils.DEVICE_TYPE_PREFIX + dpId;
this.add = add;
this.flowsPerDpn = flowsPerDpn;
this.sleepAfter = sleepAfter;
this.startTableId = startTableId;
this.endTableId = endTableId;
+ this.isCreateParents = isCreateParents;
remainingTxReturn.set(flowsPerDpn/batchSize);
}
private void writeTxToDs(WriteTransaction writeTransaction, String flowId, InstanceIdentifier<Flow> flowIid, Flow flow, Integer sourceIp, Short tableId){
if (add) {
LOG.trace("Adding flow for flowId: {}, flowIid: {}", flowId, flowIid);
- writeTransaction.put(LogicalDatastoreType.CONFIGURATION, flowIid, flow, true);
+ writeTransaction.put(LogicalDatastoreType.CONFIGURATION, flowIid, flow, isCreateParents);
} else {
LOG.trace("Deleting flow for flowId: {}, flowIid: {}", flowId, flowIid);
writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, flowIid);
import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RemoveFlowsDsInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RemoveFlowsRpcInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.SalBulkFlowService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.TableTestInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.TableTestInput.Operation;
import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.bulk.flow.ds.list.grouping.BulkFlowDsItem;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
flowTester.addFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
input.getBatchSize().intValue(), input.getSleepFor().intValue(),
input.getSleepAfter().intValue(), input.getStartTableId().shortValue(),
- input.getEndTableId().shortValue());
+ input.getEndTableId().shortValue(), input.isCreateParents());
} else {
flowTester.deleteFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
input.getBatchSize().intValue(), input.getStartTableId().shortValue(),
if (input.isIsAdd()){
flowTester.addFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
input.getBatchSize().intValue(), input.getSleepFor().intValue(),
- input.getStartTableId().shortValue(), input.getEndTableId().shortValue());
+ input.getStartTableId().shortValue(), input.getEndTableId().shortValue(),
+ input.isCreateParents());
} else {
flowTester.deleteFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
input.getBatchSize().intValue(), input.getStartTableId().shortValue(),
flowTester.addFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
input.getBatchSize().intValue(), input.getSleepFor().intValue(),
input.getSleepAfter().intValue(), input.getStartTableId().shortValue(),
- input.getEndTableId().shortValue());
+ input.getEndTableId().shortValue(), input.isCreateParents());
} else {
flowTester.deleteFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
input.getBatchSize().intValue(), input.getStartTableId().shortValue(),
return Futures.immediateFuture(rpcResultBuilder.build());
}
+ @Override
+ public Future<RpcResult<Void>> tableTest(final TableTestInput input) {
+ final TableWriter writer = new TableWriter(dataBroker, fjService);
+ flowCounterBeanImpl.setWriter(writer);
+ switch (input.getOperation()) {
+ case Add:
+ writer.addTables(input.getDpnCount().intValue(),
+ input.getStartTableId().shortValue(), input.getEndTableId().shortValue());
+ break;
+ case Delete:
+ writer.deleteTables(input.getDpnCount().intValue(),
+ input.getStartTableId().shortValue(), input.getEndTableId().shortValue());
+ break;
+ default:
+ RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.failed();
+ return Futures.immediateFuture(rpcResultBuilder.build());
+ }
+ RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.success();
+ return Futures.immediateFuture(rpcResultBuilder.build());
+ }
+
@Override
public Future<RpcResult<Void>> flowRpcAddMultiple(FlowRpcAddMultipleInput input) {
FlowWriterDirectOFRpc flowTesterRPC = new FlowWriterDirectOFRpc(dataBroker, flowService, fjService);
--- /dev/null
+/*
+ * Copyright (c) 2017 Ericsson Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.applications.bulk.o.matic;
+
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.annotation.Nullable;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TableWriter implements FlowCounterMBean {
+ private final Logger LOG = LoggerFactory.getLogger(TableWriter.class);
+
+ private final AtomicInteger writeOpStatus = new AtomicInteger(FlowCounter.OperationStatus.INIT.status());
+ private final AtomicLong taskCompletionTime = new AtomicLong(BulkOMaticUtils.DEFAULT_COMPLETION_TIME);
+ private final AtomicInteger successfulWrites = new AtomicInteger();
+ private final AtomicInteger failedWrites = new AtomicInteger();
+ private final DataBroker dataBroker;
+ private final ExecutorService tablePusher;
+
+ public TableWriter(final DataBroker dataBroker, final ExecutorService tablePusher) {
+ this.dataBroker = dataBroker;
+ this.tablePusher = tablePusher;
+ }
+
+ public void addTables(final int dpnCount, final short startTableId, final short endTableId) {
+ LOG.info("Starting to add tables: {} to {} on each of {}", startTableId, endTableId, dpnCount);
+ TableHandlerTask task = new TableHandlerTask(dpnCount, startTableId, endTableId, true);
+ tablePusher.execute(task);
+ }
+
+ public void deleteTables(int dpnCount, short startTableId, short endTableId) {
+ LOG.info("Starting to delete tables: {} to {} on each of {}", startTableId, endTableId, dpnCount);
+ TableHandlerTask task = new TableHandlerTask(dpnCount, startTableId, endTableId, false);
+ tablePusher.execute(task);
+ }
+
+ @Override
+ public int getWriteOpStatus() {
+ return writeOpStatus.get();
+ }
+
+ @Override
+ public long getTaskCompletionTime() {
+ return taskCompletionTime.get();
+ }
+
+ @Override
+ public long getTableCount() {
+ return successfulWrites.get();
+ }
+
+ private class TableHandlerTask implements Runnable {
+
+ private short startTableId;
+ private short endTableId;
+ private int dpnCount;
+ private boolean isAdd;
+
+ public TableHandlerTask(int dpnCount, short startTableId, short endTableId, boolean isAdd) {
+ this.dpnCount = dpnCount;
+ this.startTableId = startTableId;
+ this.endTableId = endTableId;
+ this.isAdd = isAdd;
+ }
+
+ @Override
+ public void run() {
+ writeOpStatus.set(FlowCounter.OperationStatus.IN_PROGRESS.status());
+ int totalTables = dpnCount * (endTableId - startTableId + 1);
+
+ for (int dpn = 1; dpn <= dpnCount; dpn++) {
+ String dpId = BulkOMaticUtils.DEVICE_TYPE_PREFIX + String.valueOf(dpn);
+ for (short tableId = startTableId; tableId <= endTableId; tableId++) {
+ WriteTransaction wtx = dataBroker.newWriteOnlyTransaction();
+ Table table = new TableBuilder().setKey(new TableKey(tableId))
+ .setId(tableId)
+ .build();
+ InstanceIdentifier<Table> tableIId = BulkOMaticUtils.getTableId(tableId, dpId);
+
+ if (isAdd) {
+ wtx.put(LogicalDatastoreType.CONFIGURATION, tableIId, table, true);
+ } else {
+ wtx.delete(LogicalDatastoreType.CONFIGURATION, tableIId);
+ }
+
+ CheckedFuture<Void, TransactionCommitFailedException> future = wtx.submit();
+
+ Futures.addCallback(future, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(@Nullable Void v) {
+ if (successfulWrites.incrementAndGet() == totalTables) {
+ if (failedWrites.get() > 0) {
+ writeOpStatus.set(FlowCounter.OperationStatus.FAILURE.status());
+ } else {
+ writeOpStatus.set(FlowCounter.OperationStatus.SUCCESS.status());
+ }
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable throwable) {
+ LOG.error("Table addition Failed. Error: {}", throwable);
+ if (failedWrites.incrementAndGet() == totalTables) {
+ writeOpStatus.set(FlowCounter.OperationStatus.FAILURE.status());
+ }
+ }
+ });
+ }
+ }
+ }
+ }
+}
}
rpc flow-test {
- input {
- leaf is-add {
- type boolean;
- mandatory true;
- status current;
- description "Add or delete";
- }
- leaf dpn-count {
- type uint32;
- mandatory true;
- status current;
- description "No of DPNs";
- }
- leaf flows-per-dpn {
- type uint32;
- mandatory true;
- status current;
- description "Flows to be pushed per DPN";
- }
- leaf start-table-id {
- type uint32;
- mandatory true;
- status current;
- description "Start adding flows from this table id";
- }
- leaf end-table-id {
- type uint32;
- mandatory true;
- status current;
- description "The last table to add flows to and then wrap around";
- }
- leaf batch-size {
- type uint32;
- mandatory true;
- status current;
- description "batch size";
- }
- leaf seq {
- type boolean;
- mandatory true;
- status current;
- description "Whether to use sequential or concurrent writer";
- }
- leaf tx-chain {
- type boolean;
- mandatory true;
- status current;
- description "Whether to use PingPong Broker or not. seq is ignored.";
- }
- leaf sleep-for {
- type uint32;
- mandatory true;
- status current;
- description "sleep for the given milliseconds";
- }
- leaf sleep-after {
- type uint32;
- mandatory true;
- status current;
- description "Sleep after the given number of iterations. Will be used in the concurrent case only";
- }
- }
+ input {
+ leaf create-parents {
+ type boolean;
+ mandatory true;
+ status current;
+ description "Create parents if doesn't exist";
+ }
+ leaf is-add {
+ type boolean;
+ mandatory true;
+ status current;
+ description "Add or delete";
+ }
+ leaf dpn-count {
+ type uint32;
+ mandatory true;
+ status current;
+ description "No of DPNs";
+ }
+ leaf flows-per-dpn {
+ type uint32;
+ mandatory true;
+ status current;
+ description "Flows to be pushed per DPN";
+ }
+ leaf start-table-id {
+ type uint32;
+ mandatory true;
+ status current;
+ description "Start adding flows from this table id";
+ }
+ leaf end-table-id {
+ type uint32;
+ mandatory true;
+ status current;
+ description "The last table to add flows to and then wrap around";
+ }
+ leaf batch-size {
+ type uint32;
+ mandatory true;
+ status current;
+ description "batch size";
+ }
+ leaf seq {
+ type boolean;
+ mandatory true;
+ status current;
+ description "Whether to use sequential or concurrent writer";
+ }
+ leaf tx-chain {
+ type boolean;
+ mandatory true;
+ status current;
+ description "Whether to use PingPong Broker or not. seq is ignored.";
+ }
+ leaf sleep-for {
+ type uint32;
+ mandatory true;
+ status current;
+ description "sleep for the given milliseconds";
+ }
+ leaf sleep-after {
+ type uint32;
+ mandatory true;
+ status current;
+ description "Sleep after the given number of iterations. Will be used in the concurrent case only";
+ }
+ }
}
rpc read-flow-test {
}
}
}
+
+ rpc table-test {
+ input {
+ leaf operation {
+ type enumeration {
+ enum add;
+ enum delete;
+ }
+ status current;
+ description "Type of operation, add or delete";
+ }
+
+ leaf dpn-count {
+ type uint32;
+ mandatory true;
+ status current;
+ description "Total number of dpns to add these tables";
+ }
+
+ leaf start-table-id {
+ type uint32;
+ mandatory true;
+ status current;
+ description "Starting table id";
+ }
+
+ leaf end-table-id {
+ type uint32;
+ mandatory true;
+ status current;
+ description "Last table id";
+ }
+ }
+ }
}
}
@Test
public void testAddFlows() throws Exception {
- flowWriterConcurrent.addFlows(1, FLOWS_PER_DPN, 10, 10, 10, (short)0, (short)1);
+ flowWriterConcurrent.addFlows(1, FLOWS_PER_DPN, 10, 10, 10, (short)0, (short)1, true);
Mockito.verify(wTx, Mockito.times(FLOWS_PER_DPN)).put(Matchers.<LogicalDatastoreType>any(), Matchers.<InstanceIdentifier<DataObject>>any(), Matchers.<DataObject>any(), Matchers.anyBoolean());
}
}
@Test
public void testAddFlows() throws Exception {
- flowWriterSequential.addFlows(1, FLOWS_PER_DPN, 10, 10, (short)0, (short)1);
+ flowWriterSequential.addFlows(1, FLOWS_PER_DPN, 10, 10, (short)0, (short)1, true);
Mockito.verify(wTx, Mockito.times(FLOWS_PER_DPN)).put(Matchers.<LogicalDatastoreType>any(), Matchers.<InstanceIdentifier<DataObject>>any(), Matchers.<DataObject>any(), Matchers.anyBoolean());
}
}
@Test
public void testAddFlows() throws Exception {
- flowWriterTxChain.addFlows(1, FLOWS_PER_DPN, 10, 10, 10, (short)0, (short)1);
+ flowWriterTxChain.addFlows(1, FLOWS_PER_DPN, 10, 10, 10, (short)0, (short)1, true);
Mockito.verify(wTx, Mockito.times(FLOWS_PER_DPN)).put(Matchers.<LogicalDatastoreType>any(), Matchers.<InstanceIdentifier<DataObject>>any(), Matchers.<DataObject>any(), Matchers.anyBoolean());
}
import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RemoveFlowsDsInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RemoveFlowsRpcInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RemoveFlowsRpcInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.TableTestInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.TableTestInput.Operation;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.TableTestInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.bulk.flow.ds.list.grouping.BulkFlowDsItem;
import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.bulk.flow.ds.list.grouping.BulkFlowDsItemBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.bulk.flow.list.grouping.BulkFlowItem;
.setSleepAfter(20L)
.setSleepFor(1L)
.setStartTableId(1L)
- .setTxChain(true);
+ .setTxChain(true)
+ .setCreateParents(true);
FlowTestInput flowTestInput = flowTestInputBuilder.build();
Assert.assertTrue(salBulkFlowService.flowRpcAddMultiple(flowRpcAddMultipleInput).get().isSuccessful());
}
+
+ @Test
+ public void testTableTest() throws Exception {
+ final TableTestInputBuilder tableTestInputBuilder = new TableTestInputBuilder()
+ .setStartTableId(0L)
+ .setEndTableId(99L)
+ .setDpnCount(1L)
+ .setOperation(Operation.Add);
+
+ TableTestInput tableTestInput = tableTestInputBuilder.build();
+
+ Assert.assertTrue(salBulkFlowService.tableTest(tableTestInput).get().isSuccessful());
+
+ tableTestInputBuilder.setOperation(Operation.Delete);
+ tableTestInput = tableTestInputBuilder.build();
+
+ Assert.assertTrue(salBulkFlowService.tableTest(tableTestInput).get().isSuccessful());
+ }
}
\ No newline at end of file
--- /dev/null
+/**
+ * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.bulk.o.matic;
+
+import static org.mockito.Mockito.doReturn;
+
+import com.google.common.util.concurrent.Futures;
+import java.util.concurrent.ExecutorService;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Matchers;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test for {@link FlowWriterSequential}.
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class TableWriterTest {
+ private static final Logger LOG = LoggerFactory.getLogger(TableWriterTest.class);
+
+ private static final int TABLES_PER_DPN = 100;
+ private static final int DPN_COUNT = 1;
+ private static final short START_TABLE_ID = 0;
+ private static final short END_TABLE_ID = 99;
+
+ @Mock
+ private DataBroker mockDataBroker;
+ @Mock
+ private ExecutorService mockTablePusher;
+ @Mock
+ private WriteTransaction wTx;
+
+ private TableWriter tableWriter;
+
+ @Before
+ public void setUp() throws Exception {
+
+ doReturn(wTx).when(mockDataBroker).newWriteOnlyTransaction();
+ Mockito.when(wTx.submit()).thenReturn(Futures.immediateCheckedFuture(null));
+
+ Mockito.doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ ((Runnable)invocation.getArguments()[0]).run();
+ return null;
+ }
+ }).when(mockTablePusher).execute(Matchers.<Runnable>any());
+
+ tableWriter = new TableWriter(mockDataBroker, mockTablePusher);
+ }
+ @Test
+ public void testAddTables() throws Exception {
+ tableWriter.addTables(DPN_COUNT, START_TABLE_ID, END_TABLE_ID);
+ Mockito.verify(wTx, Mockito.times(TABLES_PER_DPN)).put(Matchers.<LogicalDatastoreType>any(), Matchers.<InstanceIdentifier<DataObject>>any(), Matchers.<DataObject>any(), Matchers.anyBoolean());
+ }
+
+ @Test
+ public void testDeleteTables() throws Exception {
+ tableWriter.deleteTables(DPN_COUNT, START_TABLE_ID, END_TABLE_ID);
+ Mockito.verify(wTx, Mockito.times(TABLES_PER_DPN)).delete(Matchers.<LogicalDatastoreType>any(), Matchers.<InstanceIdentifier<DataObject>>any());
+ }
+}
\ No newline at end of file