package org.opendaylight.openflowplugin.applications.bulk.o.matic;
import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.binding.api.Transaction;
+import org.opendaylight.mdsal.binding.api.TransactionChain;
+import org.opendaylight.mdsal.binding.api.TransactionChainListener;
+import org.opendaylight.mdsal.binding.api.WriteTransaction;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
private final boolean isCreateParents;
private final AtomicInteger remainingTxReturn = new AtomicInteger(0);
- private BindingTransactionChain txChain;
+ private TransactionChain txChain;
FlowHandlerTask(final String dpId,
final int flowsPerDpn,
int newBatchSize = batchSize;
LOG.info("Number of Txn for dpId: {} is: {}", dpId, numSubmits);
- txChain = dataBroker.createTransactionChain(this);
+ txChain = dataBroker.createMergingTransactionChain(this);
LOG.info("Creating new txChain: {} for dpid: {}", txChain, dpId);
for (int i = 1; i <= numSubmits; i++) {
}
LOG.debug("Submitting Txn for dpId: {}, begin tableId: {}, end tableId: {}, sourceIp: {}", dpId,
tableId, calculatedTableId, sourceIp - 1);
- Futures.addCallback(writeTransaction.submit(),
+ writeTransaction.commit().addCallback(
new DsCallBack(dpId, tableId, calculatedTableId, sourceIp, txChain),
MoreExecutors.directExecutor());
// Wrap around
}
@Override
- public void onTransactionChainFailed(TransactionChain<?, ?> transactionChain,
- AsyncTransaction<?, ?> asyncTransaction, Throwable throwable) {
+ public void onTransactionChainFailed(TransactionChain transactionChain,
+ Transaction asyncTransaction, Throwable throwable) {
LOG.error("Transaction chain: {} FAILED at asyncTransaction: {} due to: ", transactionChain,
asyncTransaction.getIdentifier(), throwable);
transactionChain.close();
}
@Override
- public void onTransactionChainSuccessful(TransactionChain<?, ?> transactionChain) {
+ public void onTransactionChainSuccessful(TransactionChain transactionChain) {
LOG.info("Transaction chain: {} closed successfully.", transactionChain);
}
Flow flow, Integer sourceIp, Short tableId) {
if (add) {
LOG.trace("Adding flow for flowId: {}, flowIid: {}", flowId, flowIid);
- writeTransaction.put(LogicalDatastoreType.CONFIGURATION, flowIid, flow, isCreateParents);
+ if (isCreateParents) {
+ writeTransaction.mergeParentStructurePut(LogicalDatastoreType.CONFIGURATION, flowIid, flow);
+ } else {
+ writeTransaction.put(LogicalDatastoreType.CONFIGURATION, flowIid, flow);
+ }
} else {
LOG.trace("Deleting flow for flowId: {}, flowIid: {}", flowId, flowIid);
writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, flowIid);
}
}
- private class DsCallBack implements FutureCallback<Void> {
+ private class DsCallBack implements FutureCallback<Object> {
private final String dpId;
private final int sourceIp;
private final short endTableId;
private final short beginTableId;
- private final BindingTransactionChain txChain;
+ private final TransactionChain txChain;
DsCallBack(String dpId, Short beginTableId, Short endTableId, Integer sourceIp,
- BindingTransactionChain txChain) {
+ TransactionChain txChain) {
this.dpId = dpId;
this.sourceIp = sourceIp;
this.endTableId = endTableId;
}
@Override
- public void onSuccess(Void notUsed) {
+ public void onSuccess(Object notUsed) {
if (remainingTxReturn.decrementAndGet() <= 0) {
long dur = System.nanoTime() - startTime;
LOG.info("Completed all flows installation for: dpid: {} in {}ns", dpId, dur);