import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
-
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
private final boolean isCreateParents;
private final AtomicInteger remainingTxReturn = new AtomicInteger(0);
- BindingTransactionChain txChain;
+ private BindingTransactionChain txChain;
FlowHandlerTask(final String dpId,
final int flowsPerDpn,
LOG.debug("Submitting Txn for dpId: {}, begin tableId: {}, end tableId: {}, sourceIp: {}", dpId,
tableId, calculatedTableId, sourceIp - 1);
Futures.addCallback(writeTransaction.submit(),
- new DsCallBack(dpId, tableId, calculatedTableId, sourceIp), MoreExecutors.directExecutor());
+ new DsCallBack(dpId, tableId, calculatedTableId, sourceIp, txChain),
+ MoreExecutors.directExecutor());
// Wrap around
tableId = (short) ((calculatedTableId + 1) % (short) (endTableId - startTableId + 1) + startTableId);
newBatchSize += batchSize;
}
}
- private class DsCallBack implements FutureCallback {
+ private class DsCallBack implements FutureCallback<Void> {
private final String dpId;
private final int sourceIp;
private final short endTableId;
private final short beginTableId;
+ private final BindingTransactionChain txChain;
- DsCallBack(String dpId, Short beginTableId, Short endTableId, Integer sourceIp) {
+ DsCallBack(String dpId, Short beginTableId, Short endTableId, Integer sourceIp,
+ BindingTransactionChain txChain) {
this.dpId = dpId;
this.sourceIp = sourceIp;
this.endTableId = endTableId;
this.beginTableId = beginTableId;
+ this.txChain = txChain;
}
@Override
- public void onSuccess(Object object) {
+ public void onSuccess(Void notUsed) {
if (remainingTxReturn.decrementAndGet() <= 0) {
long dur = System.nanoTime() - startTime;
LOG.info("Completed all flows installation for: dpid: {} in {}ns", dpId, dur);