import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.opendaylight.mdsal.binding.api.DataBroker;
-import org.opendaylight.mdsal.binding.api.Transaction;
import org.opendaylight.mdsal.binding.api.TransactionChain;
-import org.opendaylight.mdsal.binding.api.TransactionChainListener;
import org.opendaylight.mdsal.binding.api.WriteTransaction;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.Empty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private final AtomicInteger countDpnWriteCompletion = new AtomicInteger();
private final AtomicLong taskCompletionTime = new AtomicLong();
- public FlowWriterTxChain(final DataBroker dataBroker, ExecutorService flowPusher) {
+ public FlowWriterTxChain(final DataBroker dataBroker, final ExecutorService flowPusher) {
this.dataBroker = dataBroker;
this.flowPusher = flowPusher;
LOG.info("Using Ping Pong Flow Tester Impl");
}
- public void addFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize, int sleepMillis, int sleepAfter,
- short startTableId, short endTableId, boolean isCreateParents) {
+ public void addFlows(final Integer dpnCount, final Integer flowsPerDPN, final int batchSize, final int sleepMillis,
+ final int sleepAfter, final short startTableId, final short endTableId, final boolean isCreateParents) {
LOG.info("Using Transaction Chain Flow Writer Impl");
countDpnWriteCompletion.set(dpnCount);
startTime = System.nanoTime();
}
}
- public void deleteFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize, short startTableId,
- short endTableId) {
+ public void deleteFlows(final Integer dpnCount, final Integer flowsPerDPN, final int batchSize,
+ final short startTableId, final short endTableId) {
LOG.info("Using Transaction Chain Flow Writer Impl");
countDpnWriteCompletion.set(dpnCount);
for (int i = 1; i <= dpnCount; i++) {
return taskCompletionTime.get();
}
- private class FlowHandlerTask implements Runnable, TransactionChainListener {
+ private class FlowHandlerTask implements Runnable, FutureCallback<Empty> {
private final String dpId;
private final boolean add;
private final int flowsPerDpn;
private final boolean isCreateParents;
private final AtomicInteger remainingTxReturn = new AtomicInteger(0);
- private TransactionChain txChain;
+ private TransactionChain txChain = null;
FlowHandlerTask(final String dpId,
final int flowsPerDpn,
int newBatchSize = batchSize;
LOG.info("Number of Txn for dpId: {} is: {}", dpId, numSubmits);
- txChain = dataBroker.createMergingTransactionChain(this);
+ txChain = dataBroker.createMergingTransactionChain();
+ txChain.addCallback(this);
LOG.info("Creating new txChain: {} for dpid: {}", txChain, dpId);
for (int i = 1; i <= numSubmits; i++) {
}
@Override
- public void onTransactionChainFailed(TransactionChain transactionChain,
- Transaction asyncTransaction, Throwable throwable) {
- LOG.error("Transaction chain: {} FAILED at asyncTransaction: {} due to: ", transactionChain,
- asyncTransaction.getIdentifier(), throwable);
- transactionChain.close();
+ public void onFailure(final Throwable throwable) {
+ LOG.error("Transaction chain: {} FAILED due to: ", txChain, throwable);
+ txChain.close();
}
@Override
- public void onTransactionChainSuccessful(TransactionChain transactionChain) {
- LOG.info("Transaction chain: {} closed successfully.", transactionChain);
+ public void onSuccess(final Empty result) {
+ LOG.info("Transaction chain: {} closed successfully.", txChain);
}
- private void writeTxToDs(WriteTransaction writeTransaction, String flowId, InstanceIdentifier<Flow> flowIid,
- Flow flow, Integer sourceIp, Short tableId) {
+ private void writeTxToDs(final WriteTransaction writeTransaction, final String flowId,
+ final InstanceIdentifier<Flow> flowIid, final Flow flow, final Integer sourceIp, final Short tableId) {
if (add) {
LOG.trace("Adding flow for flowId: {}, flowIid: {}", flowId, flowIid);
- writeTransaction.put(LogicalDatastoreType.CONFIGURATION, flowIid, flow, isCreateParents);
+ if (isCreateParents) {
+ writeTransaction.mergeParentStructurePut(LogicalDatastoreType.CONFIGURATION, flowIid, flow);
+ } else {
+ writeTransaction.put(LogicalDatastoreType.CONFIGURATION, flowIid, flow);
+ }
} else {
LOG.trace("Deleting flow for flowId: {}, flowIid: {}", flowId, flowIid);
writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, flowIid);
private final short beginTableId;
private final TransactionChain txChain;
- DsCallBack(String dpId, Short beginTableId, Short endTableId, Integer sourceIp,
- TransactionChain txChain) {
+ DsCallBack(final String dpId, final short beginTableId, final short endTableId, final int sourceIp,
+ final TransactionChain txChain) {
this.dpId = dpId;
this.sourceIp = sourceIp;
this.endTableId = endTableId;
}
@Override
- public void onSuccess(Object notUsed) {
+ public void onSuccess(final Object notUsed) {
if (remainingTxReturn.decrementAndGet() <= 0) {
long dur = System.nanoTime() - startTime;
LOG.info("Completed all flows installation for: dpid: {} in {}ns", dpId, dur);
}
@Override
- public void onFailure(Throwable error) {
+ public void onFailure(final Throwable error) {
if (remainingTxReturn.decrementAndGet() <= 0) {
long dur = System.nanoTime() - startTime;
LOG.info("Completed all flows installation for: dpid: {} in {}ns", dpId, dur);