import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.JdkFutureAdapters;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.ArrayList;
PathUtil.extractNodeId(input.getNode()),
input.getBatchRemoveFlows().size());
final ArrayList<ListenableFuture<RpcResult<RemoveFlowOutput>>> resultsLot = new ArrayList<>();
- for (BatchFlowInputGrouping batchFlow : input.getBatchRemoveFlows()) {
+ for (BatchFlowInputGrouping batchFlow : input.nonnullBatchRemoveFlows().values()) {
final RemoveFlowInput removeFlowInput = new RemoveFlowInputBuilder(batchFlow)
.setFlowRef(createFlowRef(input.getNode(), batchFlow))
.setNode(input.getNode())
.build();
- resultsLot.add(JdkFutureAdapters.listenInPoolThread(salFlowService.removeFlow(removeFlowInput)));
+ resultsLot.add(salFlowService.removeFlow(removeFlowInput));
}
final ListenableFuture<RpcResult<List<BatchFailedFlowsOutput>>> commonResult =
Futures.transform(Futures.successfulAsList(resultsLot),
- FlowUtil.createCumulatingFunction(input.getBatchRemoveFlows()),
+ FlowUtil.createCumulatingFunction(input.nonnullBatchRemoveFlows().values()),
MoreExecutors.directExecutor());
ListenableFuture<RpcResult<RemoveFlowsBatchOutput>> removeFlowsBulkFuture =
public ListenableFuture<RpcResult<AddFlowsBatchOutput>> addFlowsBatch(final AddFlowsBatchInput input) {
LOG.trace("Adding flows @ {} : {}", PathUtil.extractNodeId(input.getNode()), input.getBatchAddFlows().size());
final ArrayList<ListenableFuture<RpcResult<AddFlowOutput>>> resultsLot = new ArrayList<>();
- for (BatchFlowInputGrouping batchFlow : input.getBatchAddFlows()) {
+ for (BatchFlowInputGrouping batchFlow : input.nonnullBatchAddFlows().values()) {
final AddFlowInput addFlowInput = new AddFlowInputBuilder(batchFlow)
.setFlowRef(createFlowRef(input.getNode(), batchFlow))
.setNode(input.getNode())
.build();
- resultsLot.add(JdkFutureAdapters.listenInPoolThread(salFlowService.addFlow(addFlowInput)));
+ resultsLot.add(salFlowService.addFlow(addFlowInput));
}
final ListenableFuture<RpcResult<List<BatchFailedFlowsOutput>>> commonResult =
Futures.transform(Futures.successfulAsList(resultsLot),
- FlowUtil.createCumulatingFunction(input.getBatchAddFlows()),
+ FlowUtil.createCumulatingFunction(input.nonnullBatchAddFlows().values()),
MoreExecutors.directExecutor());
ListenableFuture<RpcResult<AddFlowsBatchOutput>> addFlowsBulkFuture =
PathUtil.extractNodeId(input.getNode()),
input.getBatchUpdateFlows().size());
final ArrayList<ListenableFuture<RpcResult<UpdateFlowOutput>>> resultsLot = new ArrayList<>();
- for (BatchUpdateFlows batchFlow : input.getBatchUpdateFlows()) {
+ for (BatchUpdateFlows batchFlow : input.nonnullBatchUpdateFlows().values()) {
final UpdateFlowInput updateFlowInput = new UpdateFlowInputBuilder(input)
.setOriginalFlow(new OriginalFlowBuilder(batchFlow.getOriginalBatchedFlow()).build())
.setUpdatedFlow(new UpdatedFlowBuilder(batchFlow.getUpdatedBatchedFlow()).build())
.setFlowRef(createFlowRef(input.getNode(), batchFlow))
.setNode(input.getNode())
.build();
- resultsLot.add(JdkFutureAdapters.listenInPoolThread(salFlowService.updateFlow(updateFlowInput)));
+ resultsLot.add(salFlowService.updateFlow(updateFlowInput));
}
final ListenableFuture<RpcResult<List<BatchFailedFlowsOutput>>> commonResult =
Futures.transform(Futures.successfulAsList(resultsLot),
- FlowUtil.createCumulatingFunction(input.getBatchUpdateFlows()),
+ FlowUtil.createCumulatingFunction(input.nonnullBatchUpdateFlows().values()),
MoreExecutors.directExecutor());
ListenableFuture<RpcResult<UpdateFlowsBatchOutput>> updateFlowsBulkFuture =